Netty (二)TaskQueue
2020-06-08 本文已影响0人
南园故剑00
1. 用户自定义普通任务
package com.sgg.netty.taskQueue;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @description:
* @date : 2020/6/7 18:23
* @author: zwz
*/
public class NettyServerTaskQueue {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup和WorkerGroup
//1. 创建两个线程组
//2. bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroupo完成
//3. 两个都是无限循环
//4. bossGroup 和 workGroup 含有的子线程的个数
//默认实际 CPU 核数*2=8。在不传参数的情况下有8个 线程。我的24个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
try {
//使用链式编程设置
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列的连接个数
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道测试对象-匿名对象
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerTaskQueueHandler());
}
}); //给我们的 workerGroup 的EventLoop对应的管道设置处理器
System.out.println("服务器准好了");
//绑定一个端口并同步,生成一个 ChannelFuture 对象
//启动服务器
ChannelFuture future = bootstrap.bind(6668).sync();
//对关闭通道进行监听
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.sgg.netty.taskQueue;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @description: 自定义一个Handler 需要继续 netty 绑定好的某个 HandlerAdapter
* @date : 2020/6/7 18:44
* @author: zwz
*/
public class NettyServerTaskQueueHandler extends ChannelInboundHandlerAdapter {
static long[] time = new long[2];
static long curTime = 0;
/*
* 读取实际数据:我们可以读取客户端发送的消息
* 1. ChannelHandlerContext 上下文对象。含有 管道pipeLine、通道channel、地址
* 2. Object msg 就是客户端发送的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
// System.out.println("server ctx=" + ctx);
//
// Channel channel = ctx.channel();
// ChannelPipeline pipeline = ctx.pipeline();
//
// //将 msg 转成一个ByteBuf
// // ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer
// ByteBuf buf = (ByteBuf) msg;
// System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
// System.out.println("客户端地址:" + ctx.channel().remoteAddress());
//比如这里有一个非常耗时的业务 -> 异步执行 -> 提交该 channel 对应的 NIOEventLoop 的 taskQueue
//解决方案一:用户程序自定义的普通任务
long l = System.currentTimeMillis();
//
ctx.channel().eventLoop().execute(new Thread0(ctx, 0) { //就是ctx中的线程
// @Override
// public void run() {
// try {
// System.out.println("执行的线程:" + Thread.currentThread().getName());
//
// Thread.sleep(10 * 1000);
// ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 222 ", CharsetUtil.UTF_8));
//
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
});
//在同一个线程中串行的
ctx.channel().eventLoop().execute(new Thread0(ctx, 1) { //就是ctx中的线程
// @Override
// public void run() {
// try {
// System.out.println("执行的线程:" + Thread.currentThread().getName());
// Thread.sleep(20 * 1000);
// ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 333 ", CharsetUtil.UTF_8));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
});
System.out.println("go on ...");
//这里耗时为什么是1ms ,不是在同一线程吗
System.out.println("耗时:" + (System.currentTimeMillis() - l));
System.out.println("线程耗时:" + time[0] + " " + time[1]);
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//把数据写到缓冲区并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 ", CharsetUtil.UTF_8));
}
//处理异常 需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
private static class Thread0 extends Thread {
ChannelHandlerContext ctx;
int i;
public Thread0(ChannelHandlerContext ctx, int i) {
this.ctx = ctx;
this.i = i;
}
@Override
public void run() {
long l = System.currentTimeMillis();
System.out.println(i + "开始时间戳" + " : " + l);
try {
System.out.println("执行的线程:" + Thread.currentThread().getName());
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端 333 ", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
time[i] = System.currentTimeMillis() - l;
System.out.println(i + "结束时间戳" + " : " + System.currentTimeMillis());
}
}
}
执行结果
服务器准好了
服务器读取线程 nioEventLoopGroup-3-1
go on ...
耗时:168884
线程耗时:0 0
0开始时间戳 : 1591850295042
执行的线程:nioEventLoopGroup-3-1
0结束时间戳 : 1591850300042
1开始时间戳 : 1591850300042
执行的线程:nioEventLoopGroup-3-1
1结束时间戳 : 1591850305043