IO

高性能网络IO模式Reactor

2021-07-01  本文已影响0人  茶还是咖啡

select/poll/epoll 是如何获取网络事件的呢?

我们熟悉的 select/poll/epoll 就是内核提供给用户态的多路复用系统调用,线程可以通过一个系统调用函数从内核中获取多个事件。在获取事件时,先把我们要关心的连接传给内核,再由内核检测。

  1. 如果没有事件发生,线程只需阻塞在这个系统调用,
  2. 如果有事件发生,内核会返回产生了事件的连接,线程就会从阻塞状态返回,然后在用户态中再处理这些连接对应的业务即可。

Reactor基于面向对象的思想,对 I/O 多路复用作了一层封装,让使用者不用考虑底层网络 API 的细节,只需要关注应用代码的编写,Reactor 模式也叫 Dispatcher 模式,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。

image.png

Reactor主要分为 Reactor处理资源池 两部分,

  1. Reactor主要用于分发事件,读写事件,连接事件。
  2. 处理资源池主要用于处理一些业务逻辑,一些任务。

Reactor 的数量可以只有一个,也可以有多个;处理资源池可以是单个进程 / 线程,也可以是多个进程 /线程
主要有一些几种模式:

  1. 单Reactor/单线(进)程
  2. 单Reactor/多线(进)程
  3. 多Reactor/单线(进)程
  4. 多Reactor/多线(进)程

java中利用这种模式的框架我知道的有 kafka,netty;c/c++语言中我知道用这种模式的有redis和nginx。

单Reactor多进程\线程

image.png
  1. Reactor 对象的作用是监听和分发事件;
  2. Acceptor 对象的作用是获取连接;
  3. Handler 对象的作用是处理业务;

其中对象里的 selectacceptreadsend 是系统调用函数,
dispatch「业务请求」 是需要完成的操作,其中 dispatch 是分发事件操作。

Reactor 对象通过 select (IO 多路复用接口) 监听事件,收到事件后通过 dispatch 进行分发,具体分发给 Acceptor 对象还是 Handler 对象,还要看收到的事件类型;

单 Reactor 单进程的方案因为全部工作都在同一个进程内完成,所以实现起来比较简单,不需要考虑进程间通信,也不用担心多进程竞争。但是,这种方案存在 2 个缺点:

第一个缺点,因为只有一个进程,无法充分利用 多核 CPU 的性能;

第二个缺点Handler 对象在业务处理时,整个进程是无法处理其他连接的事件的,如果业务处理耗时比较长,那么就造成响应的延迟;所以,单 Reactor 单进程的方案不适用IO密集型的场景,只适用于业务处理非常快速的场景

Redis 是由 C 语言实现的,它采用的正是「单 Reactor 单进程」的方案,因为 Redis 业务处理主要是在内存中完成,操作的速度是很快的,性能瓶颈不在 CPU 上,所以 Redis 对于命令的处理是单进程的方案。

单Reactor多进程\线程

如果要克服「单 Reactor 单线程 / 进程」方案的缺点,那么就需要引入多线程 / 多进程,这样就产生了单 Reactor 多线程 / 多进程的方案。

image.png

单Reactor多进程\线程 Handler 对象不再负责业务处理,只负责数据的接收和发送,Handler 对象通过 read 读取到数据后,会将数据发给子线程里的 Processor 对象进行业务处理;

子线程里的 Processor 对象就进行业务处理,处理完后,将结果发给主线程中的 Handler 对象,接着由 Handler 通过 send 方法将响应结果发送给client

单 Reator 多线程的方案优势在于能够充分利用多核 CPU 的性能,但是引入多线程,自然就带来了多线程竞争资源的问题。

例如,子线程完成业务处理后,要把结果传递给主线程的 Reactor 进行发送,这里涉及共享数据的竞争。要避免多线程由于竞争共享资源而导致数据错乱的问题,就需要在操作共享资源前加上互斥锁,以保证任意时间里只有一个线程在操作共享资源,待该线程操作完释放互斥锁后,其他线程才有机会操作共享数据。

「单 Reactor」的模式还有个问题,因为一个 Reactor 对象承担所有事件的监听和响应,而且只在主线程中运行,在面对瞬间高并发的场景时,容易成为性能的瓶颈的地方。


多Reactor多进程\多线程

image.png
  1. 主线程中的 Reactor 对象通过 select 监控连接建立事件,收到事件后通过 Acceptor 对象中的 accept 获取连接,将新的连接分配给某个子线程;
  2. 子线程中的 SubReactor 对象将 Reactor 对象分配的连接加入 select 继续进行监听,并创建一个 Handler 用于处理连接的响应事件。
  3. 如果有新的事件发生时,SubReactor 对象会调用当前连接对应的 Handler 对象来进行响应。Handler 对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程。

多 Reactor 多线程的方案虽然看起来复杂的,但是实际实现时比单 Reactor 多线程的方案要简单的多,

原因如下:

  1. 主线程和子线程分工明确,主线程只负责接收新连接,子线程负责完成后续的业务处理。
  2. 主线程和子线程的交互很简单,主线程只需要把新连接传给子线程,子线程无须返回数据,直接就可以在子线程将处理结果发送给客户端。

编码实现

功能还是客户端不断像服务端发送"hello,server",服务端收到请求后响应"hello,client"。

原生nio示例请查阅 https://juejin.cn/post/6976780917587050533

我们只对服务端进行改造,客户端仍沿用之前的代码 https://juejin.cn/post/6976780917587050533#heading-9

单Reactor单线程

  1. Reactor
public class SingletonReactor {

    private final int port;

    private Selector selector;

    private ServerSocketChannel socketChannel;

    private final ChannelProcess channelProcess;

    public SingletonReactor(int port, ChannelProcess channelProcess) {
        this.channelProcess = channelProcess;
        this.port = port;
    }

    private void init() throws IOException {
        selector = Selector.open();
        socketChannel = ServerSocketChannel.open();
        socketChannel.socket().bind(new InetSocketAddress(port));
        socketChannel.configureBlocking(false);
        SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

    public void startService() {
        try {
            init();
            System.out.println("Single Reactor mode start success on " + port);
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                for (SelectionKey selectionKey : selected) {
                    // Reactor负责dispatch收到的事件
                    dispatch(selectionKey);
                }
                selected.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey k) {
        Runnable runnable = (Runnable) k.attachment();
        if (runnable != null) {
            runnable.run();
        }
    }

    private class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel channel = SingletonReactor.this.socketChannel.accept();
                if (channel != null) {
                    new SingleThreadHandler(selector, channel, channelProcess);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}
  1. Handler
public class SingleThreadHandler implements Runnable {

    private final SocketChannel channel;

    private final SelectionKey sk;

    private final ChannelProcess channelProcess;

    private ByteBuffer outputBuffer;

    public SingleThreadHandler(Selector selector, SocketChannel channel, ChannelProcess channelProcess) throws IOException {
        this.channelProcess = channelProcess;
        this.channel = channel;
        this.channel.configureBlocking(false);
        // Optionally try first read now
        this.sk = channel.register(selector, SelectionKey.OP_READ);

        // handler as callback obj
        sk.attach(this);
        // register read
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    @Override
    public void run() {
        try {
            if (sk.isReadable()) {
                readAndProcess();
            } else if (sk.isWritable()) {
                send();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 发送数据
     *
     * @throws IOException io
     */
    private void send() throws IOException {
        channel.write(this.outputBuffer);
        sk.interestOps(SelectionKey.OP_READ);
    }

    /**
     * 读取&处理请求
     *
     * @throws IOException io
     */
    private void readAndProcess() throws IOException {
        this.outputBuffer = channelProcess.process(ByteBufferUtils.toBytes(channel));
        sk.interestOps(SelectionKey.OP_WRITE);
    }
}
  1. 这里定义了一个函数式接口专门用于处理业务
@FunctionalInterface
public interface ChannelProcess {
    /**
     * 处理请求,并返回对应的结果
     *
     * @param bytes 请求数据
     * @return 应答数据
     */
    ByteBuffer process(byte[] bytes);
}
  1. Buffer工具类
public class ByteBufferUtils {

    private final static int INPUT_SIZE = 8192;

    public static byte[] toBytes(SocketChannel channel) throws IOException {
        ByteBuffer buf = ByteBuffer.allocate(INPUT_SIZE);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        int len;
        while ((len = channel.read(buf)) > 0) {
            byteArrayOutputStream.write(buf.array(), 0, len);
            buf.flip();
        }
        return byteArrayOutputStream.toByteArray();
    }
}
  1. 组装
public class SingleReactorMain {
    public static void main(String[] args) {
        new SingletonReactor(8080, a -> {
            String recvMsg = new String(a, StandardCharsets.UTF_8);
            if (!recvMsg.isEmpty()) {
                System.out.println("Receive message: " + recvMsg);
            }
            return ByteBuffer.wrap("hello,client".getBytes(StandardCharsets.UTF_8));
        }).startService();
    }
}

因为Reactor类和Handler类相当于标准模板,一般不需要开发人员变动,编程人员只需要关注具体业务处理就可以了,Reactor模式对开发人员非常的友好。


多Reactor多线程

  1. Reactor
public class MultiThreadReactor {

    private Selector bossSelector;

    private Selector workerSelector;

    private SubReactor bossReactor;

    private SubReactor workerReactor;

    private final ChannelProcess channelProcess;

    private final int port;

    private ServerSocketChannel socketChannel;

    public MultiThreadReactor(int port, ChannelProcess channelProcess) throws IOException {
        this.channelProcess = channelProcess;
        this.port = port;
    }

    private void init() throws IOException {
        this.bossSelector = Selector.open();
        this.workerSelector = Selector.open();

        socketChannel = ServerSocketChannel.open();
        socketChannel.socket().bind(new InetSocketAddress(port));
        socketChannel.configureBlocking(false);
        SelectionKey sk = socketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());

        this.bossReactor = new SubReactor(bossSelector);
        this.workerReactor = new SubReactor(workerSelector);
    }

    @SuppressWarnings("all")
    public void startService() throws IOException {
        init();
        new Thread(bossReactor).start();
        new Thread(workerReactor).start();
        System.out.println("Multi Thread Reactor start on " + port + " success");
    }

    private class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel channel = socketChannel.accept();
                if (channel != null) {
                    new MultiThreadHandler(workerSelector, channel, channelProcess);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static class SubReactor implements Runnable {

        final Selector selector;

        public SubReactor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    if (selector.select(1) > 0) {
                        Set<SelectionKey> selected = selector.selectedKeys();
                        for (SelectionKey selectionKey : selected) {
                            dispatch(selectionKey);
                        }
                        selected.clear();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void dispatch(SelectionKey k) {
            Runnable runnable = (Runnable) k.attachment();
            if (runnable != null) {
                runnable.run();
            }
        }
    }
}

  1. Handler
public class MultiThreadHandler implements Runnable {

    private final SocketChannel channel;

    private final SelectionKey selectionKey;

    private final ChannelProcess channelProcess;

    private static final ExecutorService POOL = Executors.newFixedThreadPool(2);

    private ByteBuffer outputBuffer;

    public MultiThreadHandler(Selector selector, SocketChannel channel, ChannelProcess channelProcess) throws IOException {
        this.channel = channel;
        this.channelProcess = channelProcess;
        this.channel.configureBlocking(false);
        this.selectionKey = channel.register(selector, SelectionKey.OP_READ);

        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    @Override
    public void run() {
        POOL.submit(this::asyncRun);
    }

    /**
     * 这里加锁的目的是为了保证在同一时刻一个链接只能进行读或者写
     */
    private synchronized void asyncRun() {
        try {
            if (selectionKey.isReadable()) {
                read();
            } else if (selectionKey.isWritable()) {
                send();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void send() throws IOException {
        channel.write(this.outputBuffer);
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    private void read() throws IOException {
        this.outputBuffer = channelProcess.process(ByteBufferUtils.toBytes(channel));
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

}
  1. 组装
public class MultiThreadReactorMain {
    public static void main(String[] args) throws IOException {
        new MultiThreadReactor(8080, a -> {
            String recvMsg = new String(a, StandardCharsets.UTF_8);
            if(!recvMsg.isEmpty()){
                System.out.println("Receive message: " + recvMsg);
            }
            return ByteBuffer.wrap("hello,client".getBytes(StandardCharsets.UTF_8));
        }).startService();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读