Netty (二)TaskQueue

2020-06-08  本文已影响0人  南园故剑00

1. 用户自定义普通任务

package com.sgg.netty.taskQueue;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @description:
 * @date : 2020/6/7 18:23
 * @author: zwz
 */
public class NettyServerTaskQueue {
    public static void main(String[] args) throws InterruptedException {
        //创建BossGroup和WorkerGroup
        //1. 创建两个线程组
        //2. bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroupo完成
        //3. 两个都是无限循环
        //4. bossGroup 和 workGroup 含有的子线程的个数
        //默认实际 CPU 核数*2=8。在不传参数的情况下有8个 线程。我的24个线程
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //创建服务器端的启动对象,配置参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            //使用链式编程设置
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  //使用NioServerSocketChannel作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列的连接个数
                    .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道测试对象-匿名对象
                        //给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerTaskQueueHandler());
                        }
                    }); //给我们的 workerGroup 的EventLoop对应的管道设置处理器

            System.out.println("服务器准好了");

            //绑定一个端口并同步,生成一个 ChannelFuture 对象
            //启动服务器
            ChannelFuture future = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.sgg.netty.taskQueue;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @description: 自定义一个Handler 需要继续 netty 绑定好的某个 HandlerAdapter
 * @date : 2020/6/7 18:44
 * @author: zwz
 */
public class NettyServerTaskQueueHandler extends ChannelInboundHandlerAdapter {

    static long[] time = new long[2];

    static long curTime = 0;

    /*
     * 读取实际数据:我们可以读取客户端发送的消息
     * 1. ChannelHandlerContext 上下文对象。含有 管道pipeLine、通道channel、地址
     * 2. Object msg 就是客户端发送的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//        System.out.println("server ctx=" + ctx);
//
//        Channel channel = ctx.channel();
//        ChannelPipeline pipeline = ctx.pipeline();
//
//        //将 msg 转成一个ByteBuf
//        // ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer
//        ByteBuf buf = (ByteBuf) msg;
//        System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
//        System.out.println("客户端地址:" + ctx.channel().remoteAddress());

        //比如这里有一个非常耗时的业务 -> 异步执行 -> 提交该 channel 对应的 NIOEventLoop 的 taskQueue

        //解决方案一:用户程序自定义的普通任务
        long l = System.currentTimeMillis();
        //
        ctx.channel().eventLoop().execute(new Thread0(ctx, 0) {  //就是ctx中的线程
//            @Override
//            public void run() {
//                try {
//                    System.out.println("执行的线程:" + Thread.currentThread().getName());
//
//                    Thread.sleep(10 * 1000);
//                    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 222  ", CharsetUtil.UTF_8));
//
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
        });

        //在同一个线程中串行的
        ctx.channel().eventLoop().execute(new Thread0(ctx, 1) {  //就是ctx中的线程
//            @Override
//            public void run() {
//                try {
//                    System.out.println("执行的线程:" + Thread.currentThread().getName());
//                    Thread.sleep(20 * 1000);
//                    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 333  ", CharsetUtil.UTF_8));
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
        });
        System.out.println("go on ...");
        //这里耗时为什么是1ms ,不是在同一线程吗
        System.out.println("耗时:" + (System.currentTimeMillis() - l));
        System.out.println("线程耗时:" + time[0] + "  " + time[1]);
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //把数据写到缓冲区并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 ", CharsetUtil.UTF_8));
    }

    //处理异常 需要关闭通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    private static class Thread0 extends Thread {

        ChannelHandlerContext ctx;
        int i;

        public Thread0(ChannelHandlerContext ctx, int i) {
            this.ctx = ctx;
            this.i = i;
        }

        @Override
        public void run() {
            long l = System.currentTimeMillis();
            System.out.println(i + "开始时间戳" + " : " + l);
            try {
                System.out.println("执行的线程:" + Thread.currentThread().getName());
                Thread.sleep(5 * 1000);
                ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 333  ", CharsetUtil.UTF_8));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            time[i] = System.currentTimeMillis() - l;
            System.out.println(i + "结束时间戳" + " : " + System.currentTimeMillis());
        }
    }

}
执行结果

服务器准好了
服务器读取线程 nioEventLoopGroup-3-1
go on ...
耗时:168884
线程耗时:0 0
0开始时间戳 : 1591850295042
执行的线程:nioEventLoopGroup-3-1
0结束时间戳 : 1591850300042
1开始时间戳 : 1591850300042
执行的线程:nioEventLoopGroup-3-1
1结束时间戳 : 1591850305043

上一篇 下一篇

猜你喜欢

热点阅读