IT必备技能netty

Netty——任务加入异步线程池

2021-03-15  本文已影响0人  小波同学

前言

我们常常遇到这样的需求:在一个业务逻辑处理器中,需要写数据库、进行网络连接等耗时业务。Netty的原则是不阻塞I/O线程,所以需指定Handler执行的线程池。

如果MyBusinessLogicHandler是一个耗时的处理逻辑,应该制定group,避免I/O线程被阻塞,如果业务逻辑是异步处理或处理时间很快那么可以不用指定group。

在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。
而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2种方式,而且这2种方式实现的区别也蛮大的。

当我们使用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用IO线程。

handler 中加入线程池

服务端的实现

public class NettyServer {

    public static void main(String[] args) {
        //创建bossGroup 和 workerGroup
        //创建两个线程组,bossGroup和workerGroup
        //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new NettyServerInitializer());
            System.out.println("......服务器 id ready...");
            //绑定一个端口并同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

服务端Initializer

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new NettyServerHandler());
    }
}

服务端Handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //group充当业务线程池,可以将任务提交到该线程池
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    /**
     * 读取数据事件(读取客户端发送的消息)
     * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
     * @param msg   客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
        //如果这里有一个非常耗时长的业务 -> 提交到线程池异步执行
        group.submit(() -> {
            try {
                //模拟耗时长的业务
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 异步执行的线程:"+Thread.currentThread().getName());
                ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵2~", CharsetUtil.UTF_8));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("......go on ......");
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
    }

    /**
     * 发生异常,需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

Context 中添加线程池

服务端的实现

public class NettyServer {

    public static void main(String[] args) {
        //创建bossGroup 和 workerGroup
        //创建两个线程组,bossGroup和workerGroup
        //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new NettyServerInitializer());
            System.out.println("......服务器 id ready...");
            //绑定一个端口并同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

服务端Initializer

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //在addLast添加handler,参数指定了EventExecutorGroup
        //那么该handler会优先加入到线程池中执行
        pipeline.addLast(group,"handler",new NettyServerHandler());
    }
}

服务端Handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件(读取客户端发送的消息)
     * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
     * @param msg   客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
        System.out.println("......go on ......");
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
    }

    /**
     * 发生异常,需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

两种方式的比较

上一篇下一篇

猜你喜欢

热点阅读