高性能网络IO模式Reactor
select/poll/epoll 是如何获取网络事件的呢?
我们熟悉的 select/poll/epoll 就是内核提供给用户态的多路复用系统调用,线程可以通过一个系统调用函数从内核中获取多个事件。在获取事件时,先把我们要关心的连接传给内核,再由内核检测。
- 如果没有事件发生,线程只需阻塞在这个系统调用,
- 如果有事件发生,内核会返回产生了事件的连接,线程就会从阻塞状态返回,然后在用户态中再处理这些连接对应的业务即可。
Reactor
基于面向对象的思想,对 I/O 多路复用
作了一层封装,让使用者不用考虑底层网络 API 的细节,只需要关注应用代码的编写,Reactor
模式也叫 Dispatcher
模式,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。
Reactor主要分为 Reactor
和 处理资源池
两部分,
-
Reactor
主要用于分发事件,读写事件,连接事件。 - 处理资源池主要用于处理一些业务逻辑,一些任务。
Reactor
的数量可以只有一个,也可以有多个;处理资源池可以是单个进程 / 线程
,也可以是多个进程 /线程
;
主要有一些几种模式:
- 单Reactor/单线(进)程
- 单Reactor/多线(进)程
- 多Reactor/单线(进)程
- 多Reactor/多线(进)程
java中利用这种模式的框架我知道的有 kafka,netty;c/c++语言中我知道用这种模式的有redis和nginx。
单Reactor多进程\线程
image.png-
Reactor
对象的作用是监听和分发事件; -
Acceptor
对象的作用是获取连接; -
Handler
对象的作用是处理业务;
其中对象里的 select
、accept
、read
、send
是系统调用函数,
dispatch 和 「业务请求」 是需要完成的操作,其中 dispatch
是分发事件操作。
Reactor
对象通过 select
(IO 多路复用接口) 监听事件,收到事件后通过 dispatch
进行分发,具体分发给 Acceptor
对象还是 Handler
对象,还要看收到的事件类型;
- 如果是连接建立的事件,则交由
Acceptor
对象进行处理,Acceptor
对象会通过accept
方法 获取连接,并创建一个Handler
对象来处理后续的响应事件; - 如果不是连接建立事件, 则交由当前连接对应的
Handler
对象来进行响应;Handler
对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程。
单 Reactor 单进程的方案因为全部工作都在同一个进程内完成,所以实现起来比较简单,不需要考虑进程间通信,也不用担心多进程竞争。但是,这种方案存在 2 个缺点:
第一个缺点,因为只有一个进程,无法充分利用 多核 CPU 的性能;
第二个缺点,Handler
对象在业务处理时,整个进程是无法处理其他连接的事件的,如果业务处理耗时比较长,那么就造成响应的延迟;所以,单 Reactor 单进程的方案不适用IO密集型
的场景,只适用于业务处理非常快速的场景
。
Redis 是由 C 语言实现的,它采用的正是「单 Reactor 单进程」的方案,因为 Redis 业务处理主要是在内存中完成,操作的速度是很快的,性能瓶颈不在 CPU 上,所以 Redis 对于命令的处理是单进程的方案。
单Reactor多进程\线程
如果要克服「单 Reactor 单线程 / 进程」方案的缺点,那么就需要引入多线程 / 多进程,这样就产生了单 Reactor 多线程 / 多进程的方案。
image.png单Reactor多进程\线程 Handler
对象不再负责业务处理,只负责数据的接收和发送,Handler
对象通过 read
读取到数据后,会将数据发给子线程里的 Processor
对象进行业务处理;
子线程里的 Processor
对象就进行业务处理,处理完后,将结果发给主线程中的 Handler
对象,接着由 Handler
通过 send
方法将响应结果发送给client
;
单 Reator 多线程的方案优势在于能够充分利用多核 CPU 的性能,但是引入多线程,自然就带来了多线程竞争资源的问题。
例如,子线程完成业务处理后,要把结果传递给主线程的 Reactor
进行发送,这里涉及共享数据的竞争。要避免多线程由于竞争共享资源而导致数据错乱的问题,就需要在操作共享资源前加上互斥锁
,以保证任意时间里只有一个线程在操作共享资源,待该线程操作完释放互斥锁后,其他线程才有机会操作共享数据。
「单 Reactor」的模式还有个问题,因为一个 Reactor 对象承担所有事件的监听和响应,而且只在主线程中运行,在面对瞬间高并发的场景时,容易成为性能的瓶颈的地方。
多Reactor多进程\多线程
image.png- 主线程中的
Reactor
对象通过select
监控连接建立事件,收到事件后通过Acceptor
对象中的accept
获取连接,将新的连接分配给某个子线程; - 子线程中的
SubReactor
对象将Reactor
对象分配的连接加入select
继续进行监听,并创建一个Handler
用于处理连接的响应事件。 - 如果有新的事件发生时,
SubReactor
对象会调用当前连接对应的Handler
对象来进行响应。Handler
对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程。
多 Reactor 多线程的方案虽然看起来复杂的,但是实际实现时比单 Reactor 多线程的方案要简单的多,
原因如下:
- 主线程和子线程分工明确,主线程只负责接收新连接,子线程负责完成后续的业务处理。
- 主线程和子线程的交互很简单,主线程只需要把新连接传给子线程,子线程无须返回数据,直接就可以在子线程将处理结果发送给客户端。
编码实现
功能还是客户端不断像服务端发送"hello,server",服务端收到请求后响应"hello,client"。
原生nio示例请查阅 https://juejin.cn/post/6976780917587050533
我们只对服务端进行改造,客户端仍沿用之前的代码 https://juejin.cn/post/6976780917587050533#heading-9
单Reactor单线程
- Reactor
public class SingletonReactor {
private final int port;
private Selector selector;
private ServerSocketChannel socketChannel;
private final ChannelProcess channelProcess;
public SingletonReactor(int port, ChannelProcess channelProcess) {
this.channelProcess = channelProcess;
this.port = port;
}
private void init() throws IOException {
selector = Selector.open();
socketChannel = ServerSocketChannel.open();
socketChannel.socket().bind(new InetSocketAddress(port));
socketChannel.configureBlocking(false);
SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
public void startService() {
try {
init();
System.out.println("Single Reactor mode start success on " + port);
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
for (SelectionKey selectionKey : selected) {
// Reactor负责dispatch收到的事件
dispatch(selectionKey);
}
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey k) {
Runnable runnable = (Runnable) k.attachment();
if (runnable != null) {
runnable.run();
}
}
private class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = SingletonReactor.this.socketChannel.accept();
if (channel != null) {
new SingleThreadHandler(selector, channel, channelProcess);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- Handler
public class SingleThreadHandler implements Runnable {
private final SocketChannel channel;
private final SelectionKey sk;
private final ChannelProcess channelProcess;
private ByteBuffer outputBuffer;
public SingleThreadHandler(Selector selector, SocketChannel channel, ChannelProcess channelProcess) throws IOException {
this.channelProcess = channelProcess;
this.channel = channel;
this.channel.configureBlocking(false);
// Optionally try first read now
this.sk = channel.register(selector, SelectionKey.OP_READ);
// handler as callback obj
sk.attach(this);
// register read
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
try {
if (sk.isReadable()) {
readAndProcess();
} else if (sk.isWritable()) {
send();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 发送数据
*
* @throws IOException io
*/
private void send() throws IOException {
channel.write(this.outputBuffer);
sk.interestOps(SelectionKey.OP_READ);
}
/**
* 读取&处理请求
*
* @throws IOException io
*/
private void readAndProcess() throws IOException {
this.outputBuffer = channelProcess.process(ByteBufferUtils.toBytes(channel));
sk.interestOps(SelectionKey.OP_WRITE);
}
}
- 这里定义了一个函数式接口专门用于处理业务
@FunctionalInterface
public interface ChannelProcess {
/**
* 处理请求,并返回对应的结果
*
* @param bytes 请求数据
* @return 应答数据
*/
ByteBuffer process(byte[] bytes);
}
- Buffer工具类
public class ByteBufferUtils {
private final static int INPUT_SIZE = 8192;
public static byte[] toBytes(SocketChannel channel) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(INPUT_SIZE);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int len;
while ((len = channel.read(buf)) > 0) {
byteArrayOutputStream.write(buf.array(), 0, len);
buf.flip();
}
return byteArrayOutputStream.toByteArray();
}
}
- 组装
public class SingleReactorMain {
public static void main(String[] args) {
new SingletonReactor(8080, a -> {
String recvMsg = new String(a, StandardCharsets.UTF_8);
if (!recvMsg.isEmpty()) {
System.out.println("Receive message: " + recvMsg);
}
return ByteBuffer.wrap("hello,client".getBytes(StandardCharsets.UTF_8));
}).startService();
}
}
因为Reactor类和Handler类相当于标准模板,一般不需要开发人员变动,编程人员只需要关注具体业务处理就可以了,Reactor模式对开发人员非常的友好。
多Reactor多线程
- Reactor
public class MultiThreadReactor {
private Selector bossSelector;
private Selector workerSelector;
private SubReactor bossReactor;
private SubReactor workerReactor;
private final ChannelProcess channelProcess;
private final int port;
private ServerSocketChannel socketChannel;
public MultiThreadReactor(int port, ChannelProcess channelProcess) throws IOException {
this.channelProcess = channelProcess;
this.port = port;
}
private void init() throws IOException {
this.bossSelector = Selector.open();
this.workerSelector = Selector.open();
socketChannel = ServerSocketChannel.open();
socketChannel.socket().bind(new InetSocketAddress(port));
socketChannel.configureBlocking(false);
SelectionKey sk = socketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
this.bossReactor = new SubReactor(bossSelector);
this.workerReactor = new SubReactor(workerSelector);
}
@SuppressWarnings("all")
public void startService() throws IOException {
init();
new Thread(bossReactor).start();
new Thread(workerReactor).start();
System.out.println("Multi Thread Reactor start on " + port + " success");
}
private class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = socketChannel.accept();
if (channel != null) {
new MultiThreadHandler(workerSelector, channel, channelProcess);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static class SubReactor implements Runnable {
final Selector selector;
public SubReactor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
if (selector.select(1) > 0) {
Set<SelectionKey> selected = selector.selectedKeys();
for (SelectionKey selectionKey : selected) {
dispatch(selectionKey);
}
selected.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey k) {
Runnable runnable = (Runnable) k.attachment();
if (runnable != null) {
runnable.run();
}
}
}
}
- Handler
public class MultiThreadHandler implements Runnable {
private final SocketChannel channel;
private final SelectionKey selectionKey;
private final ChannelProcess channelProcess;
private static final ExecutorService POOL = Executors.newFixedThreadPool(2);
private ByteBuffer outputBuffer;
public MultiThreadHandler(Selector selector, SocketChannel channel, ChannelProcess channelProcess) throws IOException {
this.channel = channel;
this.channelProcess = channelProcess;
this.channel.configureBlocking(false);
this.selectionKey = channel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
POOL.submit(this::asyncRun);
}
/**
* 这里加锁的目的是为了保证在同一时刻一个链接只能进行读或者写
*/
private synchronized void asyncRun() {
try {
if (selectionKey.isReadable()) {
read();
} else if (selectionKey.isWritable()) {
send();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void send() throws IOException {
channel.write(this.outputBuffer);
selectionKey.interestOps(SelectionKey.OP_READ);
}
private void read() throws IOException {
this.outputBuffer = channelProcess.process(ByteBufferUtils.toBytes(channel));
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
}
- 组装
public class MultiThreadReactorMain {
public static void main(String[] args) throws IOException {
new MultiThreadReactor(8080, a -> {
String recvMsg = new String(a, StandardCharsets.UTF_8);
if(!recvMsg.isEmpty()){
System.out.println("Receive message: " + recvMsg);
}
return ByteBuffer.wrap("hello,client".getBytes(StandardCharsets.UTF_8));
}).startService();
}
}