spring基础

Reactor设计模式

2018-03-12  本文已影响610人  Joseph1453

一. 为什么需要

解决多请求问题,但是这些请求不需要一直占有整个线程资源(比如IO操作时不必一直等待),所以不适合使用一个请求分配一个线程的多线程方案;类似于消息队列模型,但是是事件驱动,没有Queue来做缓冲;优点:解耦、高效、提高复用,缺点:需要操作系统底层支持、内部回调复杂。

二. 预备知识

IO操作主要分成两部分:

  1. 数据准备,将数据从磁盘加载到内核缓存
  2. 将数据从内核缓存加载到用户缓存

2.1 IO的4种模型

2.2 IO多路复用

区别于传统的多进程并发模型 (每有新的IO流就分配一个新的进程管理),IO多路复用仅使用单个线程,通过记录跟踪每个I/O流的状态来同时管理多个I/O流(哪个IO流ready线程就处理哪个)

select, poll, epoll 都是I/O多路复用的具体的实现:
select:仅返回有无事件不返回具体事件Id,只能监控1024个连接,线程不安全
poll:连接数无限制
epoll:返回具体事件Id,线程安全

三. 反应器模式

处理一个或多个客户端并发请求服务的事件设计模式。当请求抵达后,服务处理程序使用I/O多路复用策略,然后同步地派发这些请求至相关的请求处理程序。

Reactor_Structures.png

3.1 模块组成

包括5个模块:

3.2 运行流程

  1. 初始化dispatcher,注册具体事件处理器到分发器(即指定什么事件触发什么事件处理器)
  2. 注册完毕后,分发器调用handle_events方法启动事件循环,并启动Synchronous Event Demultiplexer等待事件发生(阻塞等待)
  3. 当有事件发生,即某个Handle变为ready状态(如TCP socket变为等待读状态),Synchronous Event Demultiplexer就会通知Initiation Dispatcher
  4. Initiation Dispatcher根据发生的事件,将被事件源激活的Handle作为『key』来寻找并分发恰当的事件处理器回调方法

3.3 具体模型分类

singleReactor.png workerThreadPool.png multipleReactors.png

四. 源码分析

高性能NIO框架netty、腾讯开源RPC框架Tars的NIO模型都是很典型的Reactor设计模式,下面以Tars源码来分析Reactor模式的java NIO实现(仅展示关键实现)。

package com.qq.tars.net.core.nio;
tarsNIO.PNG

4.1 Reactor

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SelectableChannel;

public final class Reactor extends Thread {

    protected volatile Selector selector = null;
    private Acceptor acceptor = null;

    //启动
    public Reactor(SelectorManager selectorManager) throws IOException {
        this.acceptor = new TCPAcceptor(selectorManager);
        this.selector = Selector.open();
    }

    //注册
    public void registerChannel(SelectableChannel channel, int ops, Object attachment) throws IOException {

        SelectionKey key = channel.register(this.selector, ops, attachment);
    }

    //循环事件
    public void run() {

            for (;;) {
                selector.select();
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    dispatchEvent(key);
                }
            }
        
    }

    //处理事件
    private void dispatchEvent(final SelectionKey key) throws IOException {
        if (key.isConnectable()) {
            acceptor.handleConnectEvent(key);
        } else if (key.isAcceptable()) {
            acceptor.handleAcceptEvent(key);
        } else if (key.isReadable()) {
            acceptor.handleReadEvent(key);
        } else if (key.isValid() && key.isWritable()) {
            acceptor.handleWriteEvent(key);
        }
    }

}

4.2 TCPAcceptor
处理不同事件,以处理connect、read事件为例:

    public void handleConnectEvent(SelectionKey key) throws IOException {
        //1. Get the client channel
        SocketChannel client = (SocketChannel) key.channel();

        //2. Set the session status
        TCPSession session = (TCPSession) key.attachment();
        if (session == null) throw new RuntimeException("The session is null when connecting to ...");

        //3. Connect to server
        try {
            client.finishConnect();
            key.interestOps(SelectionKey.OP_READ);
            session.setStatus(SessionStatus.CLIENT_CONNECTED);
        } finally {
            session.finishConnect();
        }
    }

    public void handleReadEvent(SelectionKey key) throws IOException {
        TCPSession session = (TCPSession) key.attachment();
        if (session == null) throw new RuntimeException("The session is null when reading data...");
        session.read();
    }

4.3 TCPSession
以read事件的readResponse方法为例:

//放入工作线程池
response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));

4.4 工作线程池
SelectorManager提供线程池,WorkThread具体进行业务处理

上一篇下一篇

猜你喜欢

热点阅读