Java网络处理模型-非阻塞I/O+单线程

作者:聂勇 欢迎转载,请保留作者信息并说明文章来源!

“阻塞I/O+线程池”网络模型虽然比”阻塞I/O+多线程”网络模型在性能方面有提升,但这两种模型都存在一个共同的问题:面对大并发(持续大量连接同时请求)的场景,需要消耗大量的线程维持连接。CPU在大量的线程之间频繁切换,性能损耗很大。一旦单机的连接超过1万,甚至达到几万的时候,服务器的性能会急剧下降。

而NIO的Selector却很好地解决了这个问题,用“一个线程或者是CPU个数的线程”(主线程)hold住所有的连接,管理和读取客户端连接的数据,将读取的数据交给后面的线程池处理,线程池处理完业务逻辑后,将结果交给主线程发送响应给客户端,少量的线程就可以处理大量连接的请求。
非阻塞I/O+单线程模型

代码示例

示例代码只实现一个线程负责所有的事务:接收请求,解析请求,业务逻辑处理,响应数据封闭,发送响应。
NioEchoServer.java 源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
package cn.aofeng.demo.nio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 用NIO实现的Echo Server。
* @author NieYong <aofengblog@163.com>
*/
public class NioEchoServer {
private final static Logger logger = Logger.getLogger(NioEchoServer.class.getName());
// 换行符
public final static char CR = '\r';
// 回车符
public final static char LF = '\n';
/**
* @return 当前系统的行结束符
*/
private static String getLineEnd() {
return System.getProperty("line.separator");
}
/**
* 重置缓冲区状态标志位:position设置为0,limit设置为capacity的值,所有mark无效。
* 注:缓冲区原来的内容还在,并没有清除。
*
* @param buffer 字节缓冲区
*/
private static void clear(ByteBuffer buffer) {
if (null != buffer) {
buffer.clear();
}
}
/**
* 将字节缓冲区的每一个字节转换成ASCII字符。
* @param buffer 字节缓冲区
* @return 转换后的字节数组字符串
*/
private static String toDisplayChar(ByteBuffer buffer) {
if (null == buffer) {
return "null";
}
return Arrays.toString(buffer.array());
}
/**
* 将字节缓冲区用utf8编码,转换成字符串。
*
* @param buffer 字节缓冲区
* @return utf8编码转换的字符串
* @throws UnsupportedEncodingException
*/
private static String convert2String(ByteBuffer buffer) throws UnsupportedEncodingException {
return new String(buffer.array(), "utf8");
}
/**
* 去掉尾末的行结束符(\r\n),并转换成字符串。
*
* @param buffer 字节缓冲区
* @return 返回去掉行结束符后的字符串。
* @throws UnsupportedEncodingException
* @see #convert2String(ByteBuffer)
*/
private static String getLineContent(ByteBuffer buffer) throws UnsupportedEncodingException {
if (null == buffer) {
return null;
}
byte[] result = new byte[buffer.limit()-2];
System.arraycopy(buffer.array(), 0, result, 0, result.length);
return convert2String(ByteBuffer.wrap(result));
}
/**
* 顺序合并两个{@link ByteBuffer}的内容,且不改变{@link ByteBuffer}原来的标志位。即:
* <pre>
* 合并后的ByteBuffer = first + second
* </pre>
* @param first 第一个待合并的{@link ByteBuffer},合并后其内容在前面
* @param second 第二个待合并的{@link ByteBuffer},合并后其内容在后面
* @return 合并后的内容。如果两个{@link ByteBuffer}都为null,返回null。
*/
private static ByteBuffer merge(ByteBuffer first, ByteBuffer second) {
if (null == first && null == second) {
return null;
}
int oneSize = null != first ? first.limit() : 0;
int twoSize = null != second ? second.limit() : 0;
ByteBuffer result = ByteBuffer.allocate(oneSize+twoSize);
if (null != first) {
result.put(Arrays.copyOfRange(first.array(), 0, oneSize));
}
if (null != second) {
result.put(Arrays.copyOfRange(second.array(), 0, twoSize));
}
result.rewind();
return result;
}
/**
* 从字节缓冲区中获取"一行",即获取包括行结束符及其前面的内容。
*
* @param buffer 输入缓冲区
* @return 有遇到行结束符,返回包括行结束符在内的字节缓冲区;否则返回null。
*/
private static ByteBuffer getLine(ByteBuffer buffer) {
int index = 0;
boolean findCR = false;
int len = buffer.limit();
while(index < len) {
index ++;
byte temp = buffer.get();
if (CR == temp) {
findCR = true;
}
if (LF == temp && findCR && index > 0) { // 找到了行结束符
byte[] copy = new byte[index];
System.arraycopy(buffer.array(), 0, copy, 0, index);
buffer.rewind(); // 位置复原
return ByteBuffer.wrap(copy);
}
}
buffer.rewind(); // 位置复原
return null;
}
private static void readData(Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 获取上次已经读取的数据
ByteBuffer oldBuffer = (ByteBuffer) selectionKey.attachment();
if (logger.isLoggable(Level.FINE)) {
logger.fine("上一次读取的数据:"+oldBuffer+getLineEnd()+toDisplayChar(oldBuffer));
}
// 读新的数据
int readNum = 0;
ByteBuffer newBuffer = ByteBuffer.allocate(1024);
if ( (readNum = socketChannel.read(newBuffer)) <= 0 ) {
return;
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("这次读取的数据:"+newBuffer+getLineEnd()+toDisplayChar(newBuffer));
}
newBuffer.flip();
ByteBuffer lineRemain = getLine(newBuffer);
if (logger.isLoggable(Level.FINE)) {
logger.fine("解析的行数据剩余部分:"+lineRemain+getLineEnd()+toDisplayChar(lineRemain));
}
if (null != lineRemain) { // 获取到行结束符
ByteBuffer completeLine = merge(oldBuffer, lineRemain);
if (logger.isLoggable(Level.FINE)) {
logger.fine("准备输出的数据:"+completeLine+getLineEnd()+toDisplayChar(completeLine));
}
while (completeLine.hasRemaining()) { // 有可能一次没有写完,需多次写
socketChannel.write(completeLine);
}
// 清除数据
selectionKey.attach(null);
clear(oldBuffer);
clear(lineRemain);
// 判断是否退出
String lineStr = getLineContent(completeLine);
if (logger.isLoggable(Level.FINE)) {
logger.fine("判断是否退出的行数据:"+lineStr);
}
if ("exit".equalsIgnoreCase(lineStr) || "quit".equalsIgnoreCase(lineStr)) {
socketChannel.close();
}
// FIXME 行结束符后面是否还有数据? 此部分代码尚未测试
if (lineRemain.limit()+2 < newBuffer.limit()) {
byte[] temp = new byte[newBuffer.limit() - lineRemain.limit()];
newBuffer.get(temp, lineRemain.limit(), temp.length);
selectionKey.attach(temp);
}
} else { // 没有读到一个完整的行,继续读并且带上已经读取的部分数据
ByteBuffer temp = merge(oldBuffer, newBuffer);
socketChannel.register(selector, SelectionKey.OP_READ, temp);
if (logger.isLoggable(Level.FINE)) {
logger.fine("暂存到SelectionKey的数据:"+temp+getLineEnd()+toDisplayChar(temp));
}
}
}
/**
* 接受新的Socket连接。
*
* @param selector 选择器
* @param selectionKey
* @return
* @throws IOException
* @throws ClosedChannelException
*/
private static SocketChannel acceptNew(Selector selector,
SelectionKey selectionKey) throws IOException,
ClosedChannelException {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
if (null != socketChannel) {
if (logger.isLoggable(Level.INFO)) {
logger.info("收到一个新的连接,客户端IP:"+socketChannel.socket().getInetAddress().getHostAddress()+",客户端Port:"+socketChannel.socket().getPort());
}
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
return socketChannel;
}
/**
* 启动服务器。
*
* @param port 服务监听的端口
* @param selectTimeout {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
*/
private static void startServer(int port, int selectTimeout) {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(port));
if (logger.isLoggable(Level.INFO)) {
logger.info("NIO echo网络服务启动完毕,监听端口:" +port);
}
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int selectNum = selector.select(selectTimeout);
if (0 == selectNum) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = (SelectionKey) it.next();
// 接受新的Socket连接
if (selectionKey.isAcceptable()) {
acceptNew(selector, selectionKey);
}
// 读取并处理Socket的数据
if (selectionKey.isReadable()) {
readData(selector, selectionKey);
}
it.remove();
} // end of while iterator
}
} catch (IOException e) {
logger.log(Level.SEVERE, "处理网络连接出错", e);
}
}
public static void main(String[] args) {
int port = 9090;
int selectTimeout = 1000;
startServer(port, selectTimeout);
}
}

验证

1、启动服务。

1
java cn.aofeng.demo.nio.NioEchoServer 9090

执行上面的命令,启动服务,输出信息:

2013-10-26 20:38:42 cn.aofeng.demo.nio.NioEchoServer startServer
信息: NIO echo网络服务启动完毕,监听端口:9090

2、打开三个终端窗口,执行命令:

1
telnet 192.168.56.102 9090

服务输出如下信息:

2013-10-26 20:40:55 cn.aofeng.demo.nio.NioEchoServer acceptNew
信息: 收到一个新的连接,客户端IP:192.168.56.101,客户端Port:1211
2013-10-26 20:40:58 cn.aofeng.demo.nio.NioEchoServer acceptNew
信息: 收到一个新的连接,客户端IP:192.168.56.101,客户端Port:1212
2013-10-26 20:41:00 cn.aofeng.demo.nio.NioEchoServer acceptNew
信息: 收到一个新的连接,客户端IP:192.168.56.101,客户端Port:1215

注:服务所在机器的IP地址是192.168.56.102。

3、连接后,三个客户端均发送一段时间的数据,然后发送exit或quit指令,服务端关闭客户端连接。其线程列表及其状态 如下图所示:
非阻塞I/O+单线程模型运行时线程

可以看到,当有三个客户端连接上来后,NioEchoServer并没有生成其他线程来处理连接,而是全部由main线程完成。