Netty——任务加入异步线程池
2021-03-15 本文已影响0人
小波同学
前言
我们常常遇到这样的需求:在一个业务逻辑处理器中,需要写数据库、进行网络连接等耗时业务。Netty的原则是不阻塞I/O线程,所以需指定Handler执行的线程池。
如果MyBusinessLogicHandler是一个耗时的处理逻辑,应该制定group,避免I/O线程被阻塞,如果业务逻辑是异步处理或处理时间很快那么可以不用指定group。
在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。
而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2种方式,而且这2种方式实现的区别也蛮大的。
- 1、处理耗时业务的第一种方式:handler 中加入线程池。
- 2、处理耗时业务的第二种方式:Context 中添加线程池。
当我们使用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用IO线程。
handler 中加入线程池
服务端的实现
public class NettyServer {
public static void main(String[] args) {
//创建bossGroup 和 workerGroup
//创建两个线程组,bossGroup和workerGroup
//bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
//两个都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
//给workerGroup的EventLoop对应的管道设置处理器
.childHandler(new NettyServerInitializer());
System.out.println("......服务器 id ready...");
//绑定一个端口并同步,生成一个ChannelFuture对象
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端Initializer
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyServerHandler());
}
}
服务端Handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//group充当业务线程池,可以将任务提交到该线程池
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
/**
* 读取数据事件(读取客户端发送的消息)
* @param ctx 上下文对象,可以获取管道pipeline,通道channel,地址
* @param msg 客户端发送的数据,默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
//如果这里有一个非常耗时长的业务 -> 提交到线程池异步执行
group.submit(() -> {
try {
//模拟耗时长的业务
Thread.sleep(10 * 1000);
System.out.println("group.submit 异步执行的线程:"+Thread.currentThread().getName());
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵2~", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("......go on ......");
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
}
/**
* 发生异常,需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
Context 中添加线程池
服务端的实现
public class NettyServer {
public static void main(String[] args) {
//创建bossGroup 和 workerGroup
//创建两个线程组,bossGroup和workerGroup
//bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
//两个都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
//给workerGroup的EventLoop对应的管道设置处理器
.childHandler(new NettyServerInitializer());
System.out.println("......服务器 id ready...");
//绑定一个端口并同步,生成一个ChannelFuture对象
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端Initializer
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在addLast添加handler,参数指定了EventExecutorGroup
//那么该handler会优先加入到线程池中执行
pipeline.addLast(group,"handler",new NettyServerHandler());
}
}
服务端Handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件(读取客户端发送的消息)
* @param ctx 上下文对象,可以获取管道pipeline,通道channel,地址
* @param msg 客户端发送的数据,默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
System.out.println("......go on ......");
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
}
/**
* 发生异常,需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
两种方式的比较
- 1、第一种方式在handler内部添加EventExecutorGroup,可能更加自由,比如如果需要访问数据库等耗时操作那就异步,如果不需要那就不异步,异步可能会拖长接口响应时间,因为需要将任务放进mpscTask中,如果IO时间很短,Task很多,可能一个循环下来,都没有时间执行整个task,导致接口响应时间不达标。
- 2、第二种方式是Netty标准方式(即加入到队列),但是这样做会将整个handler都交给业务线程池,不论耗时不耗时,都加入到队列里不够灵活。
- 3、两种方式各有优劣,第一种灵活性更加,怎么使用,视实际情况而定。