Netty学习整理

2021-04-07  本文已影响0人  愤怒的老照

1 复习NIO

https://www.jianshu.com/p/b36b4e8c1343

2 Netty整体架构

image.png

2.1 网络通信层

网络通信层的职责是执行网络 I/O 的操作。它支持多种网络协议和 I/O 模型的连接操作。当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。

网络通信层的核心组件包含BootStrap&ServerBootStrap、Channel三个组件。

2.2 事件调度层

事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。事件调度层的核心组件包括 EventLoopGroup、EventLoop。

EventLoopGroup与线程模型的对应?
Netty 通过创建不同的 EventLoopGroup 参数配置,就可以支持 Reactor 的三种线程模型:

2.3 服务编排层

服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。

3 服务启动流程

public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

如果是使用group(EventLoopGroup group)方法,最终调用逻辑如下:

/**
     * Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
     */
    @Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

可以看出Netty 线程模型的可定制化程度很高,它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型
在注册channel时,group通过chooser(有GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser两种)选取eventLoop,将channel和eventLoop绑定,之后 Channel 生命周期内的所有 I/O 事件都由这个 EventLoop 负责处理

ServerBootstrap 中 bind() 实现:

 final ChannelFuture regFuture = initAndRegister();

// initAndRegister实现
// 1、通过反射创建服务端Channel
channel = channelFactory.newChannel();

// 通过工厂类ReflectiveChannelFactory反射创建channel,类型是最开始设置的(NioServerSocketChannel)
public NioServerSocketChannel() {
    // 创建服务端ServerSocketChannel
    this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 
}

// 在创建Channel时,注册感兴趣的事件(OP_ACCEPT)
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT); // 调用父类方法
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// Channel父类构造函数
// 1、调用 AbstractChannel 的构造函数创建了三个重要的成员变量,分别为 id、unsafe、pipeline。id 表示全局唯一的 Channel,unsafe 用于操作底层数据的读写操作,pipeline 负责业务处理器的编排。
// 2、可以看到每创建一个channel,都有与之对应的pipeline,pipeline最初包含headContext和TailContext两个节点
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId(); // Channel 全局唯一 id 
    unsafe = newUnsafe(); // unsafe 操作底层读写
    pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        // 创建 JDK 底层的 ServerSocketChannel
        return provider.openServerSocketChannel(); 
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

 final ChannelFuture regFuture = initAndRegister();

 init(channel);

// 1、添加特殊的 Handler 处理器ChannelInitializer,并引入另一个特殊的Handler处理器ServerBootstrapAcceptor
// ChannelInitializer:提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。

//  ServerBootstrapAcceptor:成功构造客户端 NioSocketChannel 后, pipeline.fireChannelRead() 触发 channelRead 事件传播。会传播到 ServerBootstrapAcceptor.channelRead() 方法,channelRead() 会将客户端 Channel 分配到工作线程组中去执行。

p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
ChannelFuture regFuture = config().group().register(channel);

// 我们初始化的group通常是MultithreadEventLoopGroup类型的,当注册时会使用选择器选择一个eventLoop进行注册
public ChannelFuture register(Channel channel) {
    return next().register(channel); // 选择一个 eventLoop 注册
}

// 调用 JDK 底层的 register() 进行注册
doRegister(); 

//调用 JDK 底层的 register() 进行注册。register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象,调用 register() 方法会将它绑定在 JDK 底层 Channel 的 attachment 上。这样在每次 Selector 对象进行事件循环时,Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 
// 1、用户自定义的业务处理器添加到 Pipeline 中
pipeline.invokeHandlerAddedIfNeeded();

// 通过invokeHandlerAddedIfNeeded调用Handler的handlerAdded实现,此处和之前创建的ChannelInitializer关联起来了
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
       if (initMap.add(ctx)) {
        try {
       //  调用ChannelInitializer实现类的具体实现
       // 1、在服务端Handler:对应的是添加ServerBootstrapAcceptor这个特殊的Handler,并且在处理完之后会删除ChannelInitializer,此时Handler责任链是HeadContext<->LogHandler<->ServerBootstrapAcceptor<->TailContext
       // 2、在客户端Handler:对应的就是EchoServer添加的自定义Handler
            initChannel((C) ctx.channel()); 
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // 处理完之后会删除ChannelInitializer
                pipeline.remove(this); 
            }
        }
        return true;
    }
    return false;
    }

// 2、触发channelRegistered事件
pipeline.fireChannelRegistered(); 
// ...省略其他代码
javaChannel().bind(localAddress, config.getBacklog());

// 完成端口绑定之后,Channel 处于活跃 Active 状态,然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件
pipeline.fireChannelActive(); // 触发 channelActive 事件

4 处理请求流程

NioEventLoop核心处理逻辑

protected void run() {
        // 1、入口死循环,不断检测IO事件并处理任务
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:

                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())

// Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。
hasTasks()

    @Override
    protected boolean hasTasks() {
        return super.hasTasks() || !tailTasks.isEmpty();
    }

// 选择策略返回值
// 如果包含普通任务和尾部任务,则调用selectNowSupplier的值;当 NioEventLoop 线程的不存在异步任务,即任务队列为空,返回的是 SELECT 策略
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

// 如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回。
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

    int selectNow() throws IOException {
        try {
            return selector.selectNow();
        } finally {
            // restore wakeup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }


// 策略分两种情况
// 1、调用selectNow非阻塞查询,有就绪的channel,则直接跳到break执行下面的逻辑
// 2、调用selectNow无就绪结果,返回SELECT策略,调用select(wakenUp.getAndSet(false));

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

select(boolean oldWakenUp)

// 1、计算 select 阻塞操作的最后截止时间,delayNanos为最近的一个定时任务,如果没有默认是一分钟
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 

// 2、检测 select 阻塞操作是否超过截止时间,timeoutMillis<0说明有定时任务要执行,需要立即退出;但是预留了0.5ms的窗口时间,为了方便舍去小数
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

// 3、轮询过程中及时处理产生的任务
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
       selector.selectNow();
       selectCnt = 1;//将已轮训次数重置为1
       break;
 }

// 4、将selector阻塞timeoutMillis毫秒
// 这里如果定时任务时间过长,肯定不能等待很长时间
// 在任务添加的时候,会调用wakeup方法唤醒线程,避免等待时间过长
int selectedKeys = selector.select(timeoutMillis);

// 5、满足一下任意条件,则推吹循环
/**
            * selectedKeys != 0 表示轮询到有已经就绪的IO事件
            * wakenUp.get() 表示是否被用户唤醒
            * hasTasks() 表示普通任务队列中有未完成的任务
            * hasScheduledTasks() 表示定时任务队列中有未完成的任务
            * 上述条件任何一个条件为真,则退出select,准备处理对应任务
            */
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }

// 6、执行时间 >= 超时时间,说明selector执行正常,轮询次数置为1
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } 

// 7、执行时间 <超时时间,并且轮询次数达到阈值,说明发生空轮询,重新构建selector并关联channel
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);

                rebuildSelector();
                selector = this.selector;

                selector.selectNow();
                selectCnt = 1;
                break;
            }
// 处理IO事件有两种方式,只看一下processSelectedKeysPlain
// 1、processSelectedKeysOptimized,处理Netty 优化过的 selectedKeys
// 2、processSelectedKeysPlain,正常的处理逻辑

// 1、处理连接事件。表示 TCP 连接建立成功, Channel 处于 Active 状态
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
// 2、处理可写事件。表示上层可以向 Channel 写入数据,通过执行 ch.unsafe().forceFlush() 操作,将数据冲刷到客户端,最终会调用 javaChannel 的 write() 方法执行底层写操作
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
// 3、处理可读事件。可读事件。表示 Channel 收到了可以被读取的新数据。依次调用 ChannelHandler 的 channelRead() 方法处理数据
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
// 1、将定时任务和普通任务合并
fetchedAll = fetchFromScheduledTaskQueue();
// 2、循环执行任务,就是直接调用的 Runnable 的 run() 方法。
safeExecute(task); // 执行任务
// 3、收尾工作,执行尾部队列任务,并不常用,一般用于统计
    protected void afterRunningAllTasks() {
        runAllTasksFrom(tailTasks);
    }
// 1、首先通过EventLoop处理读事件,读取完成后触发channelRead
pipeline.fireChannelRead(byteBuf);

// 2、Pipeline的fireChannelRead调用了公用的invokeChannelRead,因为要从Head触发,所以此时传入的next为Head
@Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }


// 3、调用channelContext的invokeChannelRead
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

// 4、调用context绑定handler的channelRead,处理具体逻辑
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

// 5、如果没有实现channelRead方法,则选取下一个context,递归执行第三步
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

2、数据写

// 1、调用writeAndFlush写数据,无论是从哪里开始写,都会调用Tail的writeAndFlush
    @Override
    public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return tail.writeAndFlush(msg, promise);
    }

// 2、与读数据相反,写数据查找下一个节点是通过查找prev指针
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

// 3、递归调用到HeadContext,通过unsafe写入socket
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

3、异常处理

// 1 、发生异常后,调用fireExceptionCaught
pipeline.fireExceptionCaught(e);

// 2、从HeadContext开始传递异常信息
    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }

// 3、同样的递归调用到TailContext的exceptionCaught,作为兜底逻辑,打印日志并释放资源
protected void onUnhandledInboundException(Throwable cause) {
        try {
            logger.warn(
                    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                            "It usually means the last handler in the pipeline did not handle the exception.",
                    cause);
        } finally {
            ReferenceCountUtil.release(cause);
        }
    }

// 4、由于异常信息是从HeadContext开始传递,所以最佳实践是将自定义的异常处理器放在TailContext前、其他自定义Handler后。可以保证如论是哪里发生了异常,自定义异常处理器都可以处理到

Netty处理粘包拆包

TCP 传输协议是面向流的,没有数据包界限。可能受MTU 传输单元大小、MSS 最大分段大小、滑动窗口等因素影响,将一个完整的报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大的报文进行发送。因此就有了拆包和粘包。

// 1、ByteToMessageDecoder继承ChannelInboundHandlerAdapter,可以看出ByteToMessageDecoder解码器应该放在HeadContext后、其他Handler前的位置

// 2、当channel可读时,触发pipeline的channelRead实现,对应到ByteToMessageDecoder 的实现如下:
               public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//......省略其他内容
                  CodecOutputList out = CodecOutputList.newInstance();
            try {
                // channelRead的msg是ByteBuf类型
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
               // 如果是第一次,则初始化cumulation
                if (first) {
                    cumulation = data;
                } else {
                    // 不是第一次,追加到cumulation
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }

                // 对数据进行处理
                callDecode(ctx, cumulation, out);

//......省略其他内容
    }

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while(true) {
                if (in.isReadable()) {
                    // out中存放的是完整的对象集合,如果不为空,就可以接着触发channelRead
                    int outSize = out.size();
                    if (outSize > 0) {
                        // 循环推进out集合的内容
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
                        if (ctx.isRemoved()) {
                            return;
                        }

                        outSize = 0;
                    }

                    int oldInputLength = in.readableBytes();
  
                    // 解码的核心逻辑
                    this.decodeRemovalReentryProtection(ctx, in, out);
                 //......省略
                }

                return;
            }
        } catch (DecoderException var6) {
            throw var6;
        } catch (Exception var7) {
            throw new DecoderException(var7);
        }
    }

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        // 设置当前状态为正在解码
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            // 解码
            decode(ctx, in, out);
        } finally {
            // 执行hander的remove操作
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

    // 子类都重写了该方法,每种实现都会有自己特殊的解码方式
    // 有FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

@Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
           // 如果有符合要求的对象,添加到集合中,由父类继续推动channelRead,此时的对象是一个完整的对象
            out.add(decoded);
        }
    }

    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 收集到的数据是否小于固定长度,小于就代表无法解析,直接返回空;并等待ByteBuf中的数据,使能够达到固定长度
        
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readRetainedSlice(frameLength);
        }
    }

上一篇下一篇

猜你喜欢

热点阅读