Netty学习之内置处理器以及编解码器

2018-10-15  本文已影响40人  颜洛滨

Netty学习之内置处理器以及编解码器

前言

SSL/TLS

SSL/TLS是目前广泛使用的加密,位于TCP之上,其他的应用层协议之下,当应用层将数据交给SSL/TLS之后,数据会被进行加密,关于SSL/TLS更多的内容,可以参考:SSL/TLS协议运行机制的概述OpenSSL 与 SSL 数字证书概念贴

javax.net.ssl中提供了原生的SSL/TLS支持,通过SSLContextSSLEngine可以方便地进行数据的加密及解密。

在Netty中,为了方便开发者使用SSL/TLS,Netty提供了SSlHandler(本质是一个ChannelHandler),只要为其配置一个SSLEngine即可进行加密数据传输。

class SslEngineInitializer extends ChannelInitializer<Channel> {

    private final SslContext context;
    private final boolean startTls;

    public SslEngineInitializer(SslContext context, boolean startTls) {
        this.context = context;
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.newEngine(ch.alloc());
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
    }
}

HTTP/HTTPS

一个HTTP请求或者相应可能由多个部分组成,一个完整的Http请求由以下内容组成

由于一个Http请求包含请求部分以及相应部分,而对于客户端及服务端来说,这两者是不相同的,客户端发送请求,服务端接收请求,服务端发送响应,客户端接收响应,所以,需要有不同的处理器来处理不同的内容

常用编解码器

HttpRequestEncoder
HttpResponseEncoder
HttpRequestDecoder
HttpResponseDecoder
class HttpPipelineInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            // 解码响应
            pipeline.addLast("decoder", new HttpResponseEncoder());
            // 编码请求
            pipeline.addLast("encoder", new HttpRequestEncoder());
        }else {
            // 解码请求
            pipeline.addLast("decoder", new HttpRequestDecoder());
            // 编码响应
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

当一个字节流被解码成Http内容之后,就可以操作具体的HttpObject消息了,但是由于一个完整的请求/响应可能会被拆分成几个部分,所以,直接使用其实不是很合适,更好地方式是使用聚合器。

class HttpAggregatorInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpAggregatorInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            // 客户端编解码器
            // 等同于上面的两者
            pipeline.addLast("codec", new HttpClientCodec());
        }else {
            // 服务端编解码器
            pipeline.addLast("codec", new HttpServerCodec());
        }
        // 聚合器,允许最大大小为 512 * 1024
        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
    }
}

聚合之后我们可以直接使用FullHttpRequestFullHttpResponse消息来处理,这两个对象表示的是完整的请求/响应了。

当使用HTTP的时候,如果内容大部分是文本数据,我们一般会使用压缩技术,虽然会增加CPU开销,但是可以有效地节省网络带宽,Netty同样提供了对应的handler并且提供gzip和deflate技术。

class HttpCompressionInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpCompressionInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("codec", new HttpClientCodec());
            // 客户端解压
            pipeline.addLast("decompressor", new HttpContentDecompressor());
        }else {
            pipeline.addLast("codec", new HttpServerCodec());
            // 服务端加压
            pipeline.addLast("compressor", new HttpContentCompressor());
        }
    }
}

同时需要注意,如果是JDK6及以前的版本,需要引入JZlib依赖。

<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jzlib</artifactId>
    <version>1.1.3</version>
</dependency>

如果我们需要使用HTTPS,只需要将SslHanlder配置在所有handler的最前面即可。

空闲检测及超时

空闲检测及超时,也可以称为心跳检测,目的就是确保连接的另一端依旧在线,如果不在线,则断开连接,节省资源。

Netty中提供了几个常用的handler

需要注意的是,IdleStateHandler的作用是用于检测channel在指定时间内是否有数据流通,如果没有的话,则触发一个IdleStateEvent,该Event是用于通知本channel的,而不是用于通知对方,所以,我们可以根据收到的Event来决定处理逻辑,比如

下面举一个具体的例子

服务端

public class Server {
    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new IdleStateHandlerInitializer());
        try {
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

/**
*  服务端空闲检测
*/
class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60));
        pipeline.addLast(new HeartbeatHandler());
    }

    /**
    *  服务端的空闲处理逻辑
    */
    private class HeartbeatHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                // 如果超过,则断开连接
                if (event.state() == IdleState.ALL_IDLE) {
                    ctx.writeAndFlush(Unpooled.copiedBuffer("bybe".getBytes()));
                    ctx.close();
                }
            }else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

客户端

class Client {
    public static void main(String[] args) {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 如果这里改成70,则会断开
                        pipeline.addLast(new IdleStateHandler(0, 0, 50, TimeUnit.SECONDS));
                        pipeline.addLast(new Heartbeat());
                    }
                });
        try {
            ChannelFuture fu = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
            fu.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }

    /**
    *  客户端空闲检测
    */
    private static class Heartbeat extends ChannelInboundHandlerAdapter {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                // 发送心跳包
                ctx.writeAndFlush(Unpooled.copiedBuffer("heartbeat".getBytes()));
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf data = (ByteBuf) msg;
            System.out.println(data.toString(CharsetUtil.UTF_8));
        }
    }
}

基于分隔符及长度的协议处理

在某些协议中,是根据换行符或者指定长度来划分的,Netty中提供了基于这两者的处理器

基于分隔符

Netty中主要的基于分隔符的处理器有以下两个

基于长度

Netty中基于长度的处理器有以下两个

发送大数据

为了高效地发送大量数据,Netty中提供了FileRegion接口(默认实现DefaultFileRegion),作为支持zero-copy的传输器,用于在channel中发送文件

如果需要将数据从文件系统拷贝到用户空间,可以使用ChunkedWriteHandler,它提供了低消耗内存异步将大数据流写出。

序列化

Netty提供的JDK序列化相关的处理器

同时,Netty还提供了基于ProtoBuf的处理器,具体的可以参考文件即可,使用上基本差不多

总结

本小节我们主要学习了Netty所提供的几个常用的handler,包括了SSL/TLS相关的handler、HTTP相关的handler、空闲处理器(心跳)、协议分割处理器以及序列化处理器等,有了这些常用的处理器,可以不用处理具体协议的相关内容,从而可以更专注于逻辑方面的处理。

上一篇下一篇

猜你喜欢

热点阅读