Netty源码分析(三) ChannelPipeline

2018-11-07  本文已影响0人  skyguard

下面我们来分析Netty里的又一个重要的组件ChannelPipeline。
每个Channel有一个pipeline,它是一个双向链表,有一个head和tail,netty的事件就是通过ChannelPipeline进行传递的。每个channel内部都会持有一个ChannelPipeline对象pipeline. pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。


ChannelPipeline

当channel完成register、active、read等操作时,会触发pipeline的相应方法。 1、当channel注册到selector时,触发pipeline的fireChannelRegistered方法。 2、当channel的socket绑定完成时,触发pipeline的fireChannelActive方法。 3、当有客户端请求时,触发pipeline的fireChannelRead方法。 4、当本次客户端请求,pipeline执行完fireChannelRead,触发pipeline的fireChannelReadComplete方法。
下面来看看ChannelPipeline的结构


DefaultChannelPipeline
ChannelPipeline是一个接口,默认的实现是DefaultChannelPipeline,每个ChannelPipeline上有ChannelHandlerContext,而ChannelHandlerContext包含了ChannelHandler,每个ChannelHandler负责处理具体的业务逻辑。下面来看看DefaultChannelPipeline有哪些属性
/**
 * Head 节点
 */
final AbstractChannelHandlerContext head;
/**
 * Tail 节点
 */
final AbstractChannelHandlerContext tail;

/**
 * 所属 Channel 对象
 */
private final Channel channel;
/**
 * 成功的 Promise 对象
 */
private final ChannelFuture succeededFuture;

1、TailContext实现了ChannelOutboundHandler接口。 2、HeadContext实现了ChannelInboundHandler接口。 3、head和tail形成了一个链表。

对于Inbound的操作,当channel注册到selector时,触发pipeline的fireChannelRegistered,从head开始遍历,找到实现了ChannelInboundHandler接口的handler,并执行其fireChannelRegistered方法。

public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

服务启动过程中,ServerBootstrap在init方法中,会给ServerSocketChannel的pipeline添加ChannelInitializer对象,其中ChannelInitializer继承ChannelInboundHandlerAdapter,并实现了ChannelInboundHandler接口,所以当ServerSocketChannel注册到selector之后,会触发其channelRegistered方法。

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. 解决并发问题
        try {
            // 初始化通道
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // 发生异常时,执行异常处理
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            // 从 pipeline 移除 ChannelInitializer
            remove(ctx);
        }
        return true; // 初始化成功
    }
    return false; // 初始化失败
}

在initChannel实现中,添加ServerBootstrapAcceptor实例到pipeline中。

ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,负责把接收到的客户端socketChannel注册到childGroup中,由childGroup中的eventLoop负责数据处理。

public void channelRead(ChannelHandlerContext ctx, Object msg) {

        // 接受的客户端的 NioSocketChannel 对象
        final Channel child = (Channel) msg;
        // 添加 NioSocketChannel 的处理器
        child.pipeline().addLast(childHandler);
        // 设置 NioSocketChannel 的配置项
        setChannelOptions(child, childOptions, logger);
        // 设置 NioSocketChannel 的属性
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            // 注册客户端的 NioSocketChannel 到 work EventLoop 中。
            childGroup.register(child).addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // 注册失败,关闭客户端的 NioSocketChannel
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }

            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

ChannelPipeline的分析就到这里了。

上一篇下一篇

猜你喜欢

热点阅读