Reactor模式与非阻塞I/O

作者:聂勇 欢迎转载,请保留作者信息并说明文章来源!

Reactor模式描述:The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

网络请求响应的整个流程,根据其职责进行划分,可以拆分成如下几个步骤:

  • 接收请求(Acceptor)。
  • 读取请求数据(Reader)。
  • 解析请求数据(Decoder)。
  • 处理业务逻辑(Process Service | Compute)。
  • 封装响应数据(Encoder)。
  • 发送响应(Writer | Sender)。

有些童鞋可能要说了,Netty就这是这样划分职责,分成多个组件联合处理网络请求和响应。没错,Netty实现了Reactor模式,通过事件驱动机制(也可以说是好莱坞原则:不要给我们打电话,我们会打电话通知你),非常高效。

前一篇文章“Java网络处理模型-非阻塞I/O+单线程”介绍了如何用NIO实现一个Echo Server。在这里,对它进行重构,按照上面的6个处理步骤,将NioEchoServer拆分成几个组件,分工协作:

  • Reactor:启动、停止服务;分派ACCEPT事件给Acceptor;分派READ事件给Reader。
  • Acceptor:处理新的客户端连接,将Reader注册并关注READ事件。
  • Reader:读取请求数据。调用Decoder解析数据。
  • Decoder:解码器。LineDecoder是一个按”行”解析的解码器。
  • ProcessService:业务逻辑处理。
  • Writer:发送响应数据。调用Encoder封装响应数据。
  • Encoder:编码器。LineEncoder是一个将响应数据转换成”行”的编码器。
Reactor模式

代码示例

注:下面的代码未完整地实现Reactor模式,只是实现了职责划分,未实现事件驱动。

Reactor源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package cn.aofeng.demo.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 负责Echo Server启动和停止 ,ACCEPT和READ事件的分派。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class Reactor {
private final static Logger logger = Logger.getLogger(Reactor.class.getName());
// 监听端口
private int _port;
// {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
private int _selectTimeout = 3000;
// 服务运行状态
private volatile boolean _isRun = true;
/**
* @param port 服务监听端口。
*/
public Reactor(int port) {
this._port = port;
}
public void setSelectTimeout(int selectTimeout) {
this._selectTimeout = selectTimeout;
}
/**
* 启动服务。
*/
public void start() {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(_port));
_isRun = true;
if (logger.isLoggable(Level.INFO)) {
logger.info("NIO echo网络服务启动完毕,监听端口:" +_port);
}
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverChannel));
while (_isRun) {
int selectNum = selector.select(_selectTimeout);
if (0 == selectNum) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = (SelectionKey) it.next();
// 接受新的Socket连接
if (selectionKey.isValid() && selectionKey.isAcceptable()) {
Acceptor acceptor = (Acceptor) selectionKey.attachment();
acceptor.accept();
}
// 读取并处理Socket的数据
if (selectionKey.isValid() && selectionKey.isReadable()) {
Reader reader = (Reader) selectionKey.attachment();
reader.read();
}
// 移除已经处理过的Key
it.remove();
} // end of while iterator
}
} catch (IOException e) {
logger.log(Level.SEVERE, "处理网络连接出错", e);
}
}
/**
* 停止服务。
*/
public void stop() {
_isRun = false;
}
public static void main(String[] args) {
if (1 != args.length) {
logger.severe("无效参数。使用示例:\n java cn.aofeng.demo.reactor.Reactor 9090");
System.exit(-1);
}
int port = Integer.parseInt(args[0]);
int selectTimeout = 1000;
Reactor reactor = new Reactor(port);
reactor.setSelectTimeout(selectTimeout);
reactor.start();
}
}

Acceptor源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.aofeng.demo.reactor;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 负责处理新连入的客户端Socket连接。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class Acceptor {
private final static Logger _logger = Logger.getLogger(Acceptor.class.getName());
protected Selector _selector;
protected ServerSocketChannel _serverChannel;
public Acceptor(Selector selector, ServerSocketChannel serverChannel) {
this._selector = selector;
this._serverChannel = serverChannel;
}
/**
* 接收一个新连入的客户端Socket连接,交给{@link Reader}处理:{@link Reader}向{@link Selector}注册并关注READ事件。
*
* @throws IOException
*/
public void accept() throws IOException {
SocketChannel clientChannel = _serverChannel.accept();
if (null != clientChannel) {
if (_logger.isLoggable(Level.INFO)) {
_logger.info("收到一个新的连接,客户端IP:"+clientChannel.socket().getInetAddress().getHostAddress()
+",客户端Port:"+clientChannel.socket().getPort());
}
clientChannel.configureBlocking(false);
Reader reader = new Reader(_selector, clientChannel);
reader.setDecoder(new LineDecoder());
clientChannel.register(_selector, SelectionKey.OP_READ, reader);
}
}
}

Reader源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package cn.aofeng.demo.reactor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 负责读取客户端的请求数据并解析。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class Reader {
private final static Logger _logger = Logger.getLogger(Reader.class.getName());
private SocketChannel _clientChannel;
private Decoder _decoder;
private final static int BUFFER_SIZE = 512;
private ByteBuffer _buffer = ByteBuffer.allocate(BUFFER_SIZE);
public Reader(Selector selector, SocketChannel clientChannel) {
this._clientChannel = clientChannel;
}
public void setDecoder(Decoder decoder) {
this._decoder = decoder;
}
public void read() throws IOException {
int readCount = _clientChannel.read(_buffer);
if (-1 == readCount) {
_clientChannel.close();
}
_buffer.flip();
int oldLimit = _buffer.limit();
String line = null;
while( (line = (String) _decoder.decode(_buffer)) != null ) { // 处理一次多行发送过来的情况
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("收到的数据:"+line);
}
// 处理业务逻辑
ProcessService service= new ProcessService(_clientChannel, line);
String result = service.execute();
// 发送响应
Writer writer = new Writer(_clientChannel, result);
writer.setEncoder(new LineEncoder());
writer.write();
// 重建临时数据缓冲区
rebuildBuffer(line.length());
}
// 缓冲区数据还没有符合一个decode数据的条件,重置数据缓冲区的状态方便append数据
if (oldLimit == _buffer.limit()) {
resetBuffer();
}
}
private void resetBuffer() {
_buffer.position(_buffer.limit());
_buffer.limit(_buffer.capacity());
}
/**
* 重建临时数据缓冲区。
*
* @param lineSize 收到的一行数据(不包括行结束符)的长度
*/
private void rebuildBuffer(int lineSize) {
if (_buffer.limit() == lineSize) {
// 数据刚好是一行
_buffer = ByteBuffer.allocate(BUFFER_SIZE);
} else if (_buffer.limit() > lineSize) {
// 数据多于一行
byte[] temp = new byte[_buffer.limit() - lineSize];
System.arraycopy(_buffer.array(), lineSize, temp, 0, temp.length);
_buffer = ByteBuffer.allocate(BUFFER_SIZE);
_buffer.put(temp);
_buffer.flip();
} else {
// nothing
}
}
}

Decoder源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.aofeng.demo.reactor;
import java.nio.ByteBuffer;
/**
* 请求数据解析器接口定义。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public interface Decoder {
/**
* 解析请求数据,不影响源数据的状态和内容。
*
* @param source {@link Reader}读取到的源数据字节数组
* @return 如果解析到符合要求的数据,则返回解析到的数据;否则返回null。
*/
public Object decode(ByteBuffer source);
}

LineDecoder源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.aofeng.demo.reactor;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 行数据解析器。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class LineDecoder implements Decoder {
private final static Logger _logger = Logger.getLogger(LineDecoder.class.getName());
/**
* 从字节缓冲区中获取"一行"。
*
* @param buffer 输入缓冲区
* @return 有遇到行结束符,返回不包括行结束符的字符串;否则返回null。
*/
@Override
public String decode(ByteBuffer source) {
int index = 0;
boolean findCR = false;
int len = source.limit();
byte[] bytes = source.array();
while(index < len) {
index ++;
byte temp = bytes[index-1];
if (Constant.CR == temp) {
findCR = true;
}
if (Constant.LF == temp && findCR) { // 找到了行结束符
byte[] copy = new byte[index];
System.arraycopy(bytes, 0, copy, 0, index);
try {
return new String(copy, Constant.CHARSET_UTF8);
} catch (UnsupportedEncodingException e) {
_logger.log(Level.SEVERE, "将解析完成的请求数据转换成字符串出错", e);
}
}
}
return null;
}
}

ProcessService源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.aofeng.demo.reactor;
import java.io.IOException;
import java.nio.channels.SocketChannel;
/**
* 业务逻辑处理。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class ProcessService {
private SocketChannel _clientChannel;
private String _line;
public ProcessService(SocketChannel clientChannel, String line) {
this._clientChannel = clientChannel;
this._line = line;
}
public String execute() {
// 判断客户端是否发送了退出指令
String content = _line.substring(0, _line.length()-2);
if (isCloseClient(content)) {
try {
_clientChannel.close();
} catch (IOException e) {
// nothing
}
}
return _line;
}
/**
* 客户端是否发送了退出指令("quit" | "exit")。
*
* @param str 收到的客户端数据
* @return 返回true表示收到了退出指令;否则返回false。
*/
private boolean isCloseClient(String str) {
return "exit".equalsIgnoreCase(str) || "quit".equalsIgnoreCase(str);
}
}

Writer源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.aofeng.demo.reactor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* 负责向客户端发送响应数据。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class Writer {
private SocketChannel _clientChannel;
private Object _data;
private Encoder _encoder;
public Writer(SocketChannel clientChannel, Object data) {
this._clientChannel = clientChannel;
this._data = data;
}
public void setEncoder(Encoder encoder) {
this._encoder = encoder;
}
public void write() throws IOException {
if (null == _data || !_clientChannel.isOpen()) {
return;
}
ByteBuffer buffer = _encoder.encode(_data);
if (null == buffer) {
return;
}
_clientChannel.write(buffer);
}
}

Encoder源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.aofeng.demo.reactor;
import java.nio.ByteBuffer;
/**
* 响应数据封装接口定义。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public interface Encoder {
/**
* 将源数据转换成{@link ByteBuffer}。
*
* @param source 源数据
* @return {@link ByteBuffer}对象。
*/
public ByteBuffer encode(Object source);
}

LineEncoder源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.aofeng.demo.reactor;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 将字符串转换成{@link ByteBuffer}并加上行结束符。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class LineEncoder implements Encoder {
private final static Logger logger = Logger.getLogger(LineEncoder.class.getName());
@Override
public ByteBuffer encode(Object source) {
String line = (String) source;
try {
ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(Constant.CHARSET_UTF8));
return buffer;
} catch (UnsupportedEncodingException e) {
logger.log(Level.SEVERE, "将响应数据转换成ByteBuffer出错", e);
}
return null;
}
}