Netty 基础核心组件

2020-04-27  本文已影响0人  holmes000

bossgroup和workgroup都是NioEeventLoopGroup

1.pipeline中handler 主要用作对数据的处理,底层是双向链表;
每个channel都有一个channelPipeline对应;
每个pipeline都由 ChannelHandlerContext节点组成的双向链表构成,每个ChannelHandlerContext都对应一个handler;
出站事件的执行顺序会从tail节点往前传递到最后一个handler;
入站事件的执行顺序会从head节点往后传递到最后一个handler;
常用方法:1addFirst() 2 addLast()


image.png
  1. ChannelHandlerContext
    常用方法


    image.png

    3.ChannelOption


    image.png
    4.EventLoopGroup和实现子类NioEventLoopGroup
    image.png
    image.png

    WorkerEeventLoopGroup中EeventLoop默认数量是核数的两倍;
    WorkerEeventLoopGroup默认是按顺序next方式选择一个EventLoop来将SocketChannel注册到其selector中
    常用方法:
    构造方法
    shutdownGracefully() 断开连接,关闭线程
    5.Unpooled


    image.png
    image.png
    不需要使用flip进行反转;因为底层维护了readerindex和writerindex;
    writerindex到capacity是可写范围;
    readerindex到writerindex是可读范围;
    0-readerindex是已读范围;
    常用方法:
    copeidBuffer
public class NettyServer {
    public static void main(String[] args) throws Exception {
        //处理连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //处理业务
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //配置
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128) //任务阻塞队列
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活跃连接
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("服务端准备完毕。。。");
            //绑定端口,生成channelFuture
            ChannelFuture channelFuture = serverBootstrap.bind(6888).sync();
            //对关闭通道监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
/**
 * 自定义Handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * ChannelHandlerContext 上下文,可以获得pipeline,通道channel
     * msg 客户端发送的数据
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程" + Thread.currentThread().getName());
        System.out.println("ser ctx.." + ctx);
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //写入buffer并刷新
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        System.out.println("核数:"+NettyRuntime.availableProcessors());
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)//设置客户端通道的实现类
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("客户端准备 ok。。");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6888).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当通道准备好就触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server 喵", CharsetUtil.UTF_8));
    }

    /**
     * 当通道有读取事件,会触发
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务器回传消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

上一篇 下一篇

猜你喜欢

热点阅读