Netty源码分析系列

Netty源码分析系列--7. Reactor模式与Netty

2018-11-03  本文已影响22人  ted005

多Reactor模式

除了上文中介绍的Reactor基础模式,还有多Reactor模式,如下图:

多reactor模式

Netty的Reactor模式实现

对应关系:

Reactor Netty
Main Reactor Boss NioEventLoopGroup中的IO线程
Sub Reactor Worker NioEventLoopGroup中的IO线程
Acceptor ServerBootStrapAcceptor
Handler 具体的ChannelHandler接口实现

ServerBootStrapAcceptor

在前文Netty源码分析系列--5.ServerBootStrap绑定端口号最后提到了ServerBootStrapAcceptor,它是Netty对Reactor模式中Acceptor的实现。

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;

    // 1. 构造函数 
    ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;

        enableAutoReadTask = new Runnable() {
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }

    // 2. 回调方法
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        //3. 自定义的ChannelInitializer加入到pipeline中
        child.pipeline().addLast(childHandler);

        setChannelOptions(child, childOptions, logger);

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            // 4. 注册channel到workerGroup
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    //......
}
上一篇 下一篇

猜你喜欢

热点阅读