netty源码分析(五) - pipeline

2020-10-12  本文已影响0人  进击的蚂蚁zzzliu

概述

pipeline结构

DefaultChannelPipeline.png

context/handler结构

Context.png

下面开始详细讲解pipeline,主要分为一下三部分:

  1. pipeline初始化
  2. 添加/删除ChannelHannel
  3. 事件的传播

1. pipeline初始化

之前在分析NioServerSocketChannel和NioSocketChannel创建时都看到过pipeline初始化的代码pipeline = newChannelPipeline();

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
    //name命名:DefaultChannelPipeline$TailContext或HeadContext#0
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.executionMask = mask(handlerClass)
    //true
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

2. 添加/删除ChannelHandler

DefaultChannelPipeline # addLast

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //1. 重复性校验
        checkMultiplicity(handler);
        //2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
        newCtx = newContext(group, filterName(name, handler), handler);
        //3. 把新创建的newCtx添加到双向链表中
        addLast0(newCtx);
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            //4. 回调添加完成事件
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    //4. 回调添加完成事件
    callHandlerAdded0(newCtx);
    return this;
}
  1. checkMultiplicity重复性校验
  2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
  3. 把新创建的newCtx添加到双向链表中
  4. 回调添加完成事件

DefaultChannelPipeline # checkMultiplicity

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        //如果没有isSharable注解,并且已经添加过,则报错
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

AbstractChannelHandlerContext # callHandlerAdded 回调添加完成事件

final void callHandlerAdded() throws Exception {
    //cas设置状态为addComplete
    if (setAddComplete()) {
        //执行用户handler(ChannelInitializer)的handlerAdded方法
        handler().handlerAdded(this);
    }
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            removeState(ctx);
        }
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) {
        try {
            //执行用户ChannelInitializer中重写的initChannel方法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                //执行后remove保证只会在新连接(channel)初始化时执行一次
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

3. 事件的传播

handler和adapter结构

ChannelInboundHandlerAdapter.png

事件和异常传播方向

pipeline (2).png

以如下EchoServer为例
read事件:

EchoServer:

.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO), serverHandler);
     }
});
@Sharable
public class EchoServerHandler extends ChannelDuplexHandler {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("----EchoServerHandler.channelReads-------");
        ctx.pipeline().write(msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("----EchoServerHandler.write-------");
        ctx.write(msg, promise);
    }
}

channelRead调用栈如下:

channelRead调用栈.png
  1. read事件首先在NioEventLoop的run方法中被从selector轮询到;
  2. 交给processSelectedKeys处理;
  3. 在AbstractNioByteChannel中调用pipeline.read;
  4. 在pipeline中执行inbound事件处理流程:HeadContext > LoggingHandler > EchoServerHandler

write事件:

write调用栈.png
  1. EchoServerHandler # channelRead中调用执行ctx.pipeline().write(msg);作为write入口
  2. DefaultChannelPipeline中调用tail.write
  3. 在pipeline中执行outbound事件处理流程:TailContext > EchoServerHandler > LoggingHandler > HeadContext
  4. 最后在HeadContext中通过NioSocketChannelUnsafe # write出去
上一篇下一篇

猜你喜欢

热点阅读