Netty 基础核心组件
2020-04-27 本文已影响0人
holmes000
bossgroup和workgroup都是NioEeventLoopGroup
1.pipeline中handler 主要用作对数据的处理,底层是双向链表;
每个channel都有一个channelPipeline对应;
每个pipeline都由 ChannelHandlerContext节点组成的双向链表构成,每个ChannelHandlerContext都对应一个handler;
出站事件的执行顺序会从tail节点往前传递到最后一个handler;
入站事件的执行顺序会从head节点往后传递到最后一个handler;
常用方法:1addFirst() 2 addLast()
image.png
-
ChannelHandlerContext
常用方法
image.png
3.ChannelOption
image.png
4.EventLoopGroup和实现子类NioEventLoopGroup
image.png
image.png
WorkerEeventLoopGroup中EeventLoop默认数量是核数的两倍;
WorkerEeventLoopGroup默认是按顺序next方式选择一个EventLoop来将SocketChannel注册到其selector中
常用方法:
构造方法
shutdownGracefully() 断开连接,关闭线程
5.Unpooled
image.png
image.png
不需要使用flip进行反转;因为底层维护了readerindex和writerindex;
writerindex到capacity是可写范围;
readerindex到writerindex是可读范围;
0-readerindex是已读范围;
常用方法:
copeidBuffer
public class NettyServer {
public static void main(String[] args) throws Exception {
//处理连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//处理业务
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) //任务阻塞队列
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持活跃连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("服务端准备完毕。。。");
//绑定端口,生成channelFuture
ChannelFuture channelFuture = serverBootstrap.bind(6888).sync();
//对关闭通道监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
/**
* 自定义Handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* ChannelHandlerContext 上下文,可以获得pipeline,通道channel
* msg 客户端发送的数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程" + Thread.currentThread().getName());
System.out.println("ser ctx.." + ctx);
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//写入buffer并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public static void main(String[] args) throws Exception {
EventLoopGroup eventExecutors = new NioEventLoopGroup();
System.out.println("核数:"+NettyRuntime.availableProcessors());
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)//设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端准备 ok。。");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道准备好就触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server 喵", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件,会触发
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回传消息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}