晓我课堂

netty入门

2021-11-19  本文已影响0人  wavefreely

netty是什么?

netty的应用

现在市面上Dubbo、zk、RocketMQ、ElasticSearch、Spring5(对 HTTP 协议的实现)、GRpc、Spark 等大型开源项目都在使用 Netty 作为底层通讯框架。

牛刀小试

在这里我们就不对底层的TCP、UDP传输协议做过多的阐述,直接进入代码
在这里我们先通过下面的程序达到的目的是,对 Netty 编程的基本结构及流程有所了解。该程序是通过 Netty 实现 HTTP 请求的处理,即接收 HTTP 请求,返回 HTTP 响应。

创建工程

首先创建一个普通的 Maven 的 Java 工程。

导入依赖
<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.36.Final</version>
    </dependency>
    <!--lombok依赖-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.6</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
创建服务器启动类
/**
 * @Description: 服务启动类
 * @author: dy
 */
public class FirstServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                     .channel(NioServerSocketChannel.class)
                     .option(ChannelOption.SO_BACKLOG, 1024)
                    //接收套接字缓冲区大小
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
                    //发送套接字缓冲区大小
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
                            pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
                            // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                            pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
                            //绑定处理器(可绑定多个)
                            pipeline.addLast(new HeartServerHandler()); //处理心跳
                            pipeline.addLast(new BusinessServerHandler()); //处理业务
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            future.channel().closeFuture().sync();
            System.out.println("=======>>服务器已启动");
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
/**
 * @Description: 心跳处理器
 * @author: dy
 */
public class HeartServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将来自于客户端的数据显示在服务端控制台
        System.out.println(ctx.channel().remoteAddress() + "==消息内容==>>" + msg);
        Message msgObj = JSONObject.parseObject(msg.toString(), Message.class);
        if(msgObj.getHeader().getMessageId() == 0){
            // 心跳消息
            // 向客户端返回心跳响应数据
            System.out.println("收到心跳请求======");
            Message heartBeat = Message.builder().header(Message.Header.builder().messageId(msgObj.getHeader().getMessageId()).build())
                    .body(Message.MessageBody.builder().data("我是服务端,收到了你的心跳请求").build())
                    .build();
            String json = JSON.toJSONString(heartBeat);
            ByteBuf buf = Unpooled.copiedBuffer(json.getBytes(Charset.forName("UTF-8")));
            ctx.channel().writeAndFlush(buf);
        }else{
            //不处理传递到下一个处理器
            System.out.println("这是心跳请求处理器,不处理此消息");
            ctx.fireChannelRead(msgObj);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}

/**
 * @Description:业务处理器
 * @author: dy
 */
public class BusinessServerHandler extends SimpleChannelInboundHandler<Message> {


    @Override
    public void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 将来自于客户端的数据显示在服务端控制台
        System.out.println(ctx.channel().remoteAddress() + "收到业务消息====>>" + msg);
        //处理具体的业务消息
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}
/**
 * @Description: 消息实体类
 * @author: dy
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Message  implements Serializable {


    /**
     * 消息头
     */
    private Header header;

    /**
     * 消息体
     */
    private MessageBody body;

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Header implements Serializable {


        private static final long serialVersionUID = 6819375880619930921L;
        /**
         * 消息任务ID
         */
        private int messageId;
        /**
         * 协议版本号
         */
        private int version = 1;
        /**
         * 消息类型
         */
        private int type;
        /**
         * 消息来源地址
         */
        private String remoteAddress;

    }

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class MessageBody implements Serializable {

        private static final long serialVersionUID = 6919544256603837227L;

        /**
         * 业务逻辑是否处理成功
         */
        private Boolean isSuccess ;
        /**
         * 业务处理消息
         */
        private String message;
        /**
         * 消息内容
         */
        private Object data;
    }
}
创建客户端服务启动类
/**
 * @Description: 客户端启动类
 * @author: dy
 */
public class FirstClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //接收套接字缓冲区大小
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
                    //发送套接字缓冲区大小
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
                            pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
                            pipeline.addLast(new FirstClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
/**
 * @Description:客户端消息处理类
 * @author: dy
 */
public class FirstClientHandler extends SimpleChannelInboundHandler<String> {

    // msg的消息类型与类中的泛型类型是一致的
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "收到服务端请求====>>" + msg);
        Message busBeat = Message.builder().header(Message.Header.builder().messageId(1).build())
                .body(Message.MessageBody.builder().data("我是客户端,请处理xxx消息").build())
                .build();
        String json = JSON.toJSONString(busBeat);
        ByteBuf buf = Unpooled.copiedBuffer((json).getBytes(Charset.forName("UTF-8")));
        ctx.channel().writeAndFlush(buf);
    }

    // 当Channel被激活后会触发该方法的执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("========");
        Message heartBeat = Message.builder().header(Message.Header.builder().messageId(0).build())
                .body(Message.MessageBody.builder().data("我是客户端,我来了").build())
                .build();
        String json = JSON.toJSONString(heartBeat);
        ByteBuf buf = Unpooled.copiedBuffer((json).getBytes(Charset.forName("UTF-8")));
        ctx.channel().writeAndFlush(buf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

通过上面的代码我们大体可以画出netty的执行流程如下图所示:


netty执行流程.png

从上面我们使用netty,我们来看下其中比较重要的几个组件,来简单介绍下

Channel

字面意思就是 管道,它本质上是对 Socket 的封装,其包含了一组 API,大大简化了直接与 Socket 进行操作的复杂性。也可以理解为一个连接网络输入和IO处理的桥梁。你可以通过Channel来判断当前的状态,是open还是connected,还可以判断当前Channel支持的IO操作,还可以使用ChannelPipeline对Channel中的消息进行处理。

EventLoopGroup

EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1, 而 EventLoop 与线程的关系是 1:1

ServerBootStrap

用于配置整个 Netty 代码,将各个组件关联起来。服务端使用的是 ServerBootStrap,而客户端使用的是则 BootStrap。

ChannelHandler与 ChannelPipeline

ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。

ChannelFuture

Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的 addListener()方法为该异步操作添加监 NIO 网络编程框听器,为其注册回调:当结果出来后马上调用执行。Netty 的异步编程模型都是建立在 Future 与回调概念之上的

在这里我们着重说一下SimpleChannelInboundHandler

SimpleChannelInboundHandler

从SimpleChannelInboundHandler的源码我们可以看到SimpleChannelInboundHandler是继承自ChannelInboundHandlerAdapter,我们来看下SimpleChannelInboundHandler的channelRead方法:

//读取消息对象  
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            //如果消息属于本Handler可以处理的消息类型,则委托给channelRead0  
            channelRead0(ctx, imsg);
        } else {
            release = false;
            //当前通道Handler,不可处理消息,通过通道上下文,通知管道线中的下一个通道处理器,接受到一个消息  
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            //如果autoRelease为自动释放消息,且消息已处理则释放消息(注意 如果消息实现了ReferenceCounted,则调用ReferenceCounted#release(),如果不是什么都不做) 
            ReferenceCountUtil.release(msg);
        }
    }
}
//ReferenceCountUtil的release方法
public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            return ((ReferenceCounted) msg).release();
        }
        return false;
}

从源码可以看出Inbound通道处理器SimpleChannelInboundHandler<I>,内部有连个变量一个为参数类型匹配器,用来判断通道是否可以处理消息,另一个变量autoRelease,用于控制是否在通道处理消息完毕时,释放消息。读取方法channelRead,首先判断跟定的消息类型是否可以被处理,如果是,则委托给channelRead0,channelRead0待子类实现;如果返回false,则将消息转递给Channel管道线的下一个通道处理器;最后,如果autoRelease为自动释放消息,且消息已处理则释放消息。

所以ChannelInboundHandlerAdapter和SimpleChannelInboundHandler最主要的区别是:

所占有的所有资源。

msg

所以在用的时候我们分情况:

这篇文章我们先简单介绍下netty的使用和一些简单的介绍,下篇文章我们来介绍下netty的粘包和拆包处理

上一篇下一篇

猜你喜欢

热点阅读