深入浅出Netty源码剖析个人学习

【第8篇】Netty的Reactor模式与Scalable IO

2019-05-21  本文已影响16人  爱学习的蹭蹭

1、Reactor 与 Procator

reactor论文1在线pdf在线下载
reactor论文2在线pdf在线下载

2、 Doug Lea 大神

NIO并发启发真谛论文在线下载

为什么Netty使用两个NioEventLoopGroup(重点面试可能问题)

采用Reactor设计模型

NioEventLoopGroup关系图
 /**
     * 
设置EventLoopGroup的父(acceptor)和子(client)。 这些EventLoopGroup用于处理ServerChannel和Channel的所有事件和IO。
     */
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

3、网络结构

4、传统服务设计



5、传统Reactor设计(单线程)

NIO 深入详解和重要体系分析


Channels(通道)

Buffers(缓存区)

Selectors(选择器)


6、Worker Thread Pools 工作线程池

图解关系

Multiple Reactor Threads 复合Reactor线程


7、 Multiple Reactors 多个Reactor线程池(非常重要)

java.nio features 的使用(重点)

基于连接的扩展

可以扩展基本的网络服务模式

API 场景使用

Buffer

abstract class Buffer {
 int capacity();
 int position();
 Buffer position(int newPosition);
 int limit();
 Buffer limit(int newLimit);
 Buffer mark();
 Buffer reset();
 Buffer clear();
 Buffer flip();
 Buffer rewind();
 int remaining();
 boolean hasRemaining();
 boolean isReadOnly();
}

ByteBuffer

abstract class ByteBuffer extends Buffer {
 static ByteBuffer allocateDirect(int capacity);
 static ByteBuffer allocate(int capacity);
 static ByteBuffer wrap(byte[] src, int offset, int len);
 static ByteBuffer wrap(byte[] src);
 boolean isDirect();
 ByteOrder order();
 ByteBuffer order(ByteOrder bo);
 ByteBuffer slice();
 ByteBuffer duplicate();
 ByteBuffer compact();
 ByteBuffer asReadOnlyBuffer();
 byte get();
 byte get(int index);
 ByteBuffer get(byte[] dst, int offset, int length);
 ByteBuffer get(byte[] dst);
 ByteBuffer put(byte b);
 ByteBuffer put(int index, byte b);
 ByteBuffer put(byte[] src, int offset, int length);
 ByteBuffer put(ByteBuffer src);
 ByteBuffer put(byte[] src);
 char getChar();
 char getChar(int index);
 ByteBuffer putChar(char value);
 ByteBuffer putChar(int index, char value);
 CharBuffer asCharBuffer();
 
 short getShort();
 short getShort(int index);
 ByteBuffer putShort(short value);
 ByteBuffer putShort(int index, short value);
 ShortBuffer asShortBuffer();
 int getInt();
 int getInt(int index);
 ByteBuffer putInt(int value);
 ByteBuffer putInt(int index, int value);
 IntBuffer asIntBuffer();
 long getLong();
 long getLong(int index);
 ByteBuffer putLong(long value);
 ByteBuffer putLong(int index, long value);
 LongBuffer asLongBuffer();
 float getFloat();
 float getFloat(int index);
 ByteBuffer putFloat(float value);
 ByteBuffer putFloat(int index, float value);
 FloatBuffer asFloatBuffer();
 double getDouble();
 double getDouble(int index);
 ByteBuffer putDouble(double value);
 ByteBuffer putDouble(int index, double value);
 DoubleBuffer asDoubleBuffer();
}


Channel

interface Channel {
   boolean isOpen();
   void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
   int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
   int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
   int read(ByteBuffer[] dsts, int offset, int length)throws IOException;
   int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
   int write(ByteBuffer[] srcs, int offset, int length)throws IOException;
   int write(ByteBuffer[] srcs) throws IOException;
}

SelectableChannel

abstract class SelectableChannel implements Channel {
 int validOps();
 boolean isRegistered();
 SelectionKey keyFor(Selector sel);
 SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
 void configureBlocking(boolean block) throws IOException;
 boolean isBlocking();
 Object blockingLock();
}


8、Reactor Pattern设计模型

事件驱动模型

状态Handler与多复用线程模式

Multithreaded Designs(多线程设计)


9、Reactor模型的角色构成(Reactor模型一共有5种角色构成、非常重点)

1、Handle(句柄或是描述符):本质上表示一种资源,是有操作系统提供的,该资源用于表示一个一个的事件,比如说文件描述符,或是针对网络编程中
的Socket描述符,事件既可以来自于外部,也可以来自于内部,外部事件比如说是客户的连接请求,客户发送过来数据等,内部事件比如说操作系统产生的定时器
事件等,它本质上就是一个文件描述符,Handle是事件产生的发源地
2、Synchronous Event Demultiplexer(同步事件风离器):它本身是一个系统调用,用于等待事件的发生(事件可能是一个,也可能是多个)调用方法调用
它的时候会被阻塞,一直阻塞到同步事件分离器上事件产生为止,对于Linux来说,同步事件分离器指的就是常用的 I/O 多复用机制,比如说:select,pool,epoll等在Java NIO领域中同步事件分离器对应的组件就是Selector,对应的阻塞方法就是select方法
3、Event Handler(事件处理器):本身有多个回调方法构成,这些回调方法构成了与应用相关的对于某个时间的反馈机制,Netty相比于Java NIO来说,在事件
处理器这个角色上进行了一个升级,它为我们开发者提供了大量的回调方法,供我们在特定事件产生时实现相应的回调方法进行业务逻辑的处理
4、Concrete Event Handler(具体事件处理器):事件处理器的实现,它本身实现了事件处理器所提供的各个回调方法,从而实现了特定业务的逻辑,它本质上交所我们所编写的一个个的处理器实现
5、Initiation Dispatcher(初始分发器):实际上就是Reactor角色,它本身定义了一些规范,这些规范用于控制事件的调度方法,同时又提供了应用进行事件处理器的注册,删除等设施,它本身是整个事件处理器的核心所在,Initiation Dispatcher 会通过同步事件分离器来等待事件的发生,一旦事件发生,Initiation Dispatcher手写会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件

10、Reactor模型的流程(非常重要)

1、当Initiation Dispatcher注册具体的事件处理器时,应用会标识出该事件处理器希望 Initiation Dispatcher 在某个事件发生时向其通知的该事件,该事件与Handle关联
2、Initiation Dispatcher 会要求每个事件处理器向其传递内部的Handle,该handle向操作系统标识了事件处理器
3、当所有的事件处理器注册完毕后,应用会回调handle_events方法来启用Initiation Dispatcher的事件循环,这时,Initiation Dispatcher会将每个注册管理器Handle合并起来,并使用同步事件分离器等等这些事件的发生,比如说:TCP协议层会使用select同步事件分离器操作来等待客户端发生的数据到达连接socket handle上
4、当与某个事件源对应的handle变为ready状态时,(比如说,tcp socket变为等待读状态时)同步事件分离器就会通知Initiation Dispatcher
5、Initiation Dispatcher 会触发事件处理器的回调方法,从而影响整个处于ready状态的Handle,当事件发生时,Initiation Dispatcher 会将被事件源激活的Handle作为【key】来寻找并分发适当的事件处理器回调方法
6、Initiation Dispatcher 会回调事件处理器handle_events回调方法来执行特定于应用的功能,(开发者自己所编写的功能),从而响应整个事件,所发生的事件类型可以作为该方法参数并发被该方法内部使用来执行额外的特定于服务的分离和分发


11、 推荐学习文章

上一篇下一篇

猜你喜欢

热点阅读