Netty简介

2019-07-29  本文已影响0人  横渡

Netty是一个异步的,时间驱动的网络编程框架,使用Netty可以快速开发出可维护的、高性能、高扩展能力的协议服务及客户端应用。Netty简化和流线化了网络应用的编程开发过程,例如TCP、UDP的socket开发。

Netty基础api-ChannelInboundHandlerAdapter

业务处理类 ChannelInboundHandlerAdapter,ChannelHandler的适配器类,ChannelHandler提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实现接口方法。

客户端业务类:

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {// 业务处理及其他
            ByteBuf buf = (ByteBuf) msg;
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            String request = new String(data, "utf-8");
            System.out.println("Client: " + request);
        } finally {
            ReferenceCountUtil.release(msg); // 最后要释放Buffer,要不然会内存泄漏
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端:

public class NettyNioClient {

    public static void main(String[] args) throws InterruptedException{
        EventLoopGroup workGroup = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(workGroup).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });
        // 建立连接
        ChannelFuture f = b.connect("127.0.0.1", 5678).sync();
        // 给服务端发送数据
        f.channel().writeAndFlush(Unpooled.copiedBuffer("hi netty".getBytes()));
        // 强烈建议发送的是Buffer类型数据,因为netty涉及到一系列解析器,解析的是Buffer类型的数据
        f.channel().closeFuture().sync();
        workGroup.shutdownGracefully();
    }
}

服务端业务类NettyNioServerHandler:

public class NettyNioServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // do something msg
        ByteBuf buf = (ByteBuf) msg;
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);
        String request = new String(data, "utf-8");
        System.out.println("Server: " + request);
        // 写给客户端, ctx.write 方法不会使消息写入到通道上,它会存储到缓存中,需要调用ctx.flush() 方法来把缓冲区数据强制输出
//         ctx.write(Unpooled.copiedBuffer("888".getBytes())); //
        ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
        // 操作完成后关闭客户端channel
        f.addListener(ChannelFutureListener.CLOSE);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端代码NettyNioServer :

public class NettyNioServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();

            // 配置启动辅助类
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // ch.pipeline().addLast(new FixedLengthFrameDecoder(5)); // 固定长度解码器
                    // ch.pipeline().addLast(new StringDecoder()); // 字符创解码器(StringDecoder)将缓存(buffer)解码为字符串
                    ch.pipeline().addLast(new NettyNioServerHandler());
                }
            });
            bootstrap.option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = bootstrap.bind(5678).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}

TCP粘包和拆包

TCP网络传输是基于流的形式传输,所谓的流是没有界限的数据,好比河里的水,是没有断续的。

TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓存区的实际情况进行包的划分。所以一个完整的业务包可能会被TCP拆分为多个包进行发送,也可将多个小的业务包封装成一个大的数据包发送出去。这就是所谓的 TCP的 粘包和拆包。

Netty编解码框架

TCP的粘包、拆包问题,可以通过自定义通信协议的方式来解决。通信协议约定了通信双方的报文格式,发送方按照这个报文格式发送报文,接收方就按照这个格式来做报文的解析。

典型的协议包括:定长协议、特殊字符分隔符协议、报文头指定Length等。在确定了使用什么通信协议的情况下,发送方和接收方要完成的工作有所不同。

编码:发送方完成报文的拼接后,要将报文转成二进制数据流(称之为编码encode),编码功能由编码器(encoder)完成。

解码:接收方需要根据协议来对二进制数据进行解析,这个称为解码(decode),解码功能由解码器(decoder)完成。

编解码:既能编码,又能解码,则称为编码解码器(codec)。这种组件在发送方和接收方都可以使用。

对于开发人员而言,主要的工作有两点:确定协议,编写协议对应的编码/解码器。
协议分为公有协议和私有协议。所谓公有协议,指的是业界普遍遵循的通信协议,Netty提供了大量的公有协议数据格式的编码/解码器,从而简化了开发者的使用。例如:

Netty解码器

Netty中主要提供了抽象基类ByteToMessageDecoder,MessageToMessageDecoder。实现了ChannelInboundHandler接口。

ByteToMessageDecoder:用于将接收到的二进制数据(byte)解码,得到完整的请求报文(Message)。
MessageToMessageDecoder:将一个本身就包含完整报文信息的对象转换成另一个Java对象。

ByteToMessageDecoder 提供了一些常见的实现类:

Netty提供的MessageToMessageDecoder实现类比较少,主要是:

与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现了ChannelOutboundHandler接口。

相对来说,编码器比解码器的实现更加简单,原因在于解码器除了要按照协议解析数据,还要处理粘包、拆包问题;而编码器只要将数据转换成协议规定的二进制格式即可。

上一篇 下一篇

猜你喜欢

热点阅读