Netty学习整理
1 复习NIO
https://www.jianshu.com/p/b36b4e8c1343
2 Netty整体架构
image.png2.1 网络通信层
网络通信层的职责是执行网络 I/O 的操作。它支持多种网络协议和 I/O 模型的连接操作。当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。
网络通信层的核心组件包含BootStrap&ServerBootStrap、Channel三个组件。
- BootStrap(客户端) & ServerBootStrap(服务端):负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。
-
Channel:Channel提供了基本的 API 用于网络 I/O 操作,提供了与底层 Socket 交互的能力,如 register、bind、connect、read、write、flush 等。当然 Channel 会有多种状态,如连接建立、连接注册、数据读写、连接销毁等。随着状态的变化,Channel 处于不同的生命周期,每一种状态都会绑定相应的事件回调:
image.png
2.2 事件调度层
事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。事件调度层的核心组件包括 EventLoopGroup、EventLoop。
- EventLoopGroup :本质是一个线程池,主要负责接收 I/O 请求,并分配线程执行处理请求
- EventLoop:同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
EventLoopGroup与线程模型的对应?
Netty 通过创建不同的 EventLoopGroup 参数配置,就可以支持 Reactor 的三种线程模型:
-
单线程模型:EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;
-
多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;
-
主从多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。
2.3 服务编排层
服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。
服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。
- ChannelPipeline:负责组装各种 ChannelHandler,当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。
- ChannelHandler & ChannelHandlerContext:ChannelHandlerContext 用于保存 ChannelHandler 上下文,通过 ChannelHandlerContext 我们可以知道 ChannelPipeline 和 ChannelHandler 的关联关系
3 服务启动流程
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
- 配置线程池:当前配置采用主从Reactor模式。Boss 是主 Reactor,Worker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup,主 Reactor 负责处理 Accept,然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。
如果是使用group(EventLoopGroup group)方法,最终调用逻辑如下:
/**
* Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
*/
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
可以看出Netty 线程模型的可定制化程度很高,它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型
在注册channel时,group通过chooser(有GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser两种)选取eventLoop,将channel和eventLoop绑定,之后 Channel 生命周期内的所有 I/O 事件都由这个 EventLoop 负责处理
ServerBootstrap 中 bind() 实现:
- 创建channel
final ChannelFuture regFuture = initAndRegister();
// initAndRegister实现
// 1、通过反射创建服务端Channel
channel = channelFactory.newChannel();
// 通过工厂类ReflectiveChannelFactory反射创建channel,类型是最开始设置的(NioServerSocketChannel)
public NioServerSocketChannel() {
// 创建服务端ServerSocketChannel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 在创建Channel时,注册感兴趣的事件(OP_ACCEPT)
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT); // 调用父类方法
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// Channel父类构造函数
// 1、调用 AbstractChannel 的构造函数创建了三个重要的成员变量,分别为 id、unsafe、pipeline。id 表示全局唯一的 Channel,unsafe 用于操作底层数据的读写操作,pipeline 负责业务处理器的编排。
// 2、可以看到每创建一个channel,都有与之对应的pipeline,pipeline最初包含headContext和TailContext两个节点
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId(); // Channel 全局唯一 id
unsafe = newUnsafe(); // unsafe 操作底层读写
pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 创建 JDK 底层的 ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
- 初始化channel
final ChannelFuture regFuture = initAndRegister();
init(channel);
// 1、添加特殊的 Handler 处理器ChannelInitializer,并引入另一个特殊的Handler处理器ServerBootstrapAcceptor
// ChannelInitializer:提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。
// ServerBootstrapAcceptor:成功构造客户端 NioSocketChannel 后, pipeline.fireChannelRead() 触发 channelRead 事件传播。会传播到 ServerBootstrapAcceptor.channelRead() 方法,channelRead() 会将客户端 Channel 分配到工作线程组中去执行。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
- 注册channel
ChannelFuture regFuture = config().group().register(channel);
// 我们初始化的group通常是MultithreadEventLoopGroup类型的,当注册时会使用选择器选择一个eventLoop进行注册
public ChannelFuture register(Channel channel) {
return next().register(channel); // 选择一个 eventLoop 注册
}
// 调用 JDK 底层的 register() 进行注册
doRegister();
//调用 JDK 底层的 register() 进行注册。register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象,调用 register() 方法会将它绑定在 JDK 底层 Channel 的 attachment 上。这样在每次 Selector 对象进行事件循环时,Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- 注册完成后的事件触发(涉及pipeline如何和自定义的Handler关联)
// 1、用户自定义的业务处理器添加到 Pipeline 中
pipeline.invokeHandlerAddedIfNeeded();
// 通过invokeHandlerAddedIfNeeded调用Handler的handlerAdded实现,此处和之前创建的ChannelInitializer关联起来了
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) {
try {
// 调用ChannelInitializer实现类的具体实现
// 1、在服务端Handler:对应的是添加ServerBootstrapAcceptor这个特殊的Handler,并且在处理完之后会删除ChannelInitializer,此时Handler责任链是HeadContext<->LogHandler<->ServerBootstrapAcceptor<->TailContext
// 2、在客户端Handler:对应的就是EchoServer添加的自定义Handler
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 处理完之后会删除ChannelInitializer
pipeline.remove(this);
}
}
return true;
}
return false;
}
// 2、触发channelRegistered事件
pipeline.fireChannelRegistered();
- 端口绑定
// ...省略其他代码
javaChannel().bind(localAddress, config.getBacklog());
// 完成端口绑定之后,Channel 处于活跃 Active 状态,然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件
pipeline.fireChannelActive(); // 触发 channelActive 事件
4 处理请求流程
NioEventLoop核心处理逻辑
protected void run() {
// 1、入口死循环,不断检测IO事件并处理任务
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
- 轮询 I/O 事件
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
// Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。
hasTasks()
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
// 选择策略返回值
// 如果包含普通任务和尾部任务,则调用selectNowSupplier的值;当 NioEventLoop 线程的不存在异步任务,即任务队列为空,返回的是 SELECT 策略
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
// 如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回。
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
// 策略分两种情况
// 1、调用selectNow非阻塞查询,有就绪的channel,则直接跳到break执行下面的逻辑
// 2、调用selectNow无就绪结果,返回SELECT策略,调用select(wakenUp.getAndSet(false));
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
- 调用select阻塞方法
select(boolean oldWakenUp)
// 1、计算 select 阻塞操作的最后截止时间,delayNanos为最近的一个定时任务,如果没有默认是一分钟
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
// 2、检测 select 阻塞操作是否超过截止时间,timeoutMillis<0说明有定时任务要执行,需要立即退出;但是预留了0.5ms的窗口时间,为了方便舍去小数
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 3、轮询过程中及时处理产生的任务
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;//将已轮训次数重置为1
break;
}
// 4、将selector阻塞timeoutMillis毫秒
// 这里如果定时任务时间过长,肯定不能等待很长时间
// 在任务添加的时候,会调用wakeup方法唤醒线程,避免等待时间过长
int selectedKeys = selector.select(timeoutMillis);
// 5、满足一下任意条件,则推吹循环
/**
* selectedKeys != 0 表示轮询到有已经就绪的IO事件
* wakenUp.get() 表示是否被用户唤醒
* hasTasks() 表示普通任务队列中有未完成的任务
* hasScheduledTasks() 表示定时任务队列中有未完成的任务
* 上述条件任何一个条件为真,则退出select,准备处理对应任务
*/
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 6、执行时间 >= 超时时间,说明selector执行正常,轮询次数置为1
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
}
// 7、执行时间 <超时时间,并且轮询次数达到阈值,说明发生空轮询,重新构建selector并关联channel
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
- 处理IO事件
// 处理IO事件有两种方式,只看一下processSelectedKeysPlain
// 1、processSelectedKeysOptimized,处理Netty 优化过的 selectedKeys
// 2、processSelectedKeysPlain,正常的处理逻辑
// 1、处理连接事件。表示 TCP 连接建立成功, Channel 处于 Active 状态
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 2、处理可写事件。表示上层可以向 Channel 写入数据,通过执行 ch.unsafe().forceFlush() 操作,将数据冲刷到客户端,最终会调用 javaChannel 的 write() 方法执行底层写操作
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 3、处理可读事件。可读事件。表示 Channel 收到了可以被读取的新数据。依次调用 ChannelHandler 的 channelRead() 方法处理数据
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
- 任务执行
// 1、将定时任务和普通任务合并
fetchedAll = fetchFromScheduledTaskQueue();
// 2、循环执行任务,就是直接调用的 Runnable 的 run() 方法。
safeExecute(task); // 执行任务
// 3、收尾工作,执行尾部队列任务,并不常用,一般用于统计
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
- 数据在 Pipeline 中的运转
1、数据读
// 1、首先通过EventLoop处理读事件,读取完成后触发channelRead
pipeline.fireChannelRead(byteBuf);
// 2、Pipeline的fireChannelRead调用了公用的invokeChannelRead,因为要从Head触发,所以此时传入的next为Head
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// 3、调用channelContext的invokeChannelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
// 4、调用context绑定handler的channelRead,处理具体逻辑
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
// 5、如果没有实现channelRead方法,则选取下一个context,递归执行第三步
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
2、数据写
// 1、调用writeAndFlush写数据,无论是从哪里开始写,都会调用Tail的writeAndFlush
@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}
// 2、与读数据相反,写数据查找下一个节点是通过查找prev指针
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
// 3、递归调用到HeadContext,通过unsafe写入socket
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
3、异常处理
// 1 、发生异常后,调用fireExceptionCaught
pipeline.fireExceptionCaught(e);
// 2、从HeadContext开始传递异常信息
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
// 3、同样的递归调用到TailContext的exceptionCaught,作为兜底逻辑,打印日志并释放资源
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
// 4、由于异常信息是从HeadContext开始传递,所以最佳实践是将自定义的异常处理器放在TailContext前、其他自定义Handler后。可以保证如论是哪里发生了异常,自定义异常处理器都可以处理到
Netty处理粘包拆包
TCP 传输协议是面向流的,没有数据包界限。可能受MTU 传输单元大小、MSS 最大分段大小、滑动窗口等因素影响,将一个完整的报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大的报文进行发送。因此就有了拆包和粘包。
- ByteToMessageDecoder抽象类
// 1、ByteToMessageDecoder继承ChannelInboundHandlerAdapter,可以看出ByteToMessageDecoder解码器应该放在HeadContext后、其他Handler前的位置
// 2、当channel可读时,触发pipeline的channelRead实现,对应到ByteToMessageDecoder 的实现如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//......省略其他内容
CodecOutputList out = CodecOutputList.newInstance();
try {
// channelRead的msg是ByteBuf类型
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
// 如果是第一次,则初始化cumulation
if (first) {
cumulation = data;
} else {
// 不是第一次,追加到cumulation
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 对数据进行处理
callDecode(ctx, cumulation, out);
//......省略其他内容
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while(true) {
if (in.isReadable()) {
// out中存放的是完整的对象集合,如果不为空,就可以接着触发channelRead
int outSize = out.size();
if (outSize > 0) {
// 循环推进out集合的内容
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
return;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 解码的核心逻辑
this.decodeRemovalReentryProtection(ctx, in, out);
//......省略
}
return;
}
} catch (DecoderException var6) {
throw var6;
} catch (Exception var7) {
throw new DecoderException(var7);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// 设置当前状态为正在解码
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 解码
decode(ctx, in, out);
} finally {
// 执行hander的remove操作
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
// 子类都重写了该方法,每种实现都会有自己特殊的解码方式
// 有FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
- 最简单的解决方案:FixedLengthFrameDecoder
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
// 如果有符合要求的对象,添加到集合中,由父类继续推动channelRead,此时的对象是一个完整的对象
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 收集到的数据是否小于固定长度,小于就代表无法解析,直接返回空;并等待ByteBuf中的数据,使能够达到固定长度
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}