个人技术栈

Netty组件介绍

2017-08-03  本文已影响8人  Chinesszz

在学习Netty之前,建议首先学习一个NIO,对关键的NIO组件有一个清醒认识

Buffer

Selector

总览

ServerBootstrap

一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。

EventLoop

一个EventLoop可以为多个Channel服务。 EventLoopGroup会包含多个EventLoop

ChannelPipeline,ChannelHandler

从PipeLine这个单词中可以看出来,是一个管道,处理连接。我们的业务代码handler一般都是放在这个管道中的

那么疑问来了,这个管道中的处理顺序是什么样呢?

 ChannelPipeline cp = channel.pipeline(); 
 cp.addLast("encoder", new HttpResponseEncoder());//1.负责输出
 cp.addLast("decoder", new HttpRequestDecoder());//2.负责把客户端的数据解码
 cp.addLast("handler", new HttpDispatchServerHandler());//3.自定义的业务处理器

按照我们执行顺序肯定不是根据添加顺序来处理的,应该是:2,把客户端的数据解码->3.对解码数据处理->1.加密返回给客户端。

那么 Netty 是怎么处理的呢?

ChannelHandler有两个子类ChannelInboundHandler和ChannelOutboundHandler,这两个类对应了两个数据流向,如果数据是从外部流入我们的应用程序,我们就看做是inbound,相反便是outbound

ChannelInitializer

顾名思义,这个就是channel初始化的容器,在这个里面设置处理器

 ServerBootstrap bootstrap = new ServerBootstrap();
        bossGroup = new NioEventLoopGroup();//负责绑定channel到selector
        workerGroup = new NioEventLoopGroup();//负责从selector中读取事件
        bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .localAddress(6969).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                .childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                ChannelPipeline cp = channel.pipeline();
                cp.addLast("idleStateHandler", new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
                cp.addLast("decoder", new HttpRequestDecoder());
                cp.addLast("encoder", new HttpResponseEncoder());
                cp.addLast("aggregator", new HttpObjectAggregator(1048576));
                cp.addLast("deflater", new HttpContentCompressor());
                cp.addLast("handler", new HttpDispatchServerHandler());
                cp.addLast("out", new AcceptorIdleStateTrigger());
            }
        }).option(ChannelOption.SO_BACKLOG, 128);
        try {
            channel = bootstrap.bind().awaitUninterruptibly().channel();
            showBanner(6969);
        } catch (Exception ie) {
            throw new RuntimeException(ie);
        }

ChannelFuture

Netty中的连接都可以是异步的,但是也可以设置为非异步

ChannelFuture

Channel 每个操作都会返回一个 ChannelFutrue 因为是异步的,所以我们为每个异步的结果,添加一个监听,比如:

# 当完成关闭动作,就执行监听器内容
f = f.channel().closeFuture().await();  
f.addListener(new ChannelFutureListener() {  
    @Override  
    public void operationComplete(ChannelFuture future) throws Exception {  
        System.out.println("success complete!!ok!!");  
    }  
});  

当然还有一种方法, 就是await(),此返回会等待上一个操作完成,在进行下一个操作。但是推荐使用第一种。

ByteToMessageDecoder

解密器,可以自定义协议,通过集成改接口,重写 decode 方法把二进制,转换为我们系统可以处理的对象

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 把字节转换为int
 * 继承抽象类ByteToMessageDecoder实现解码器
 */
public class ByteToIntegerDecoder extends ByteToMessageDecoder {

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
                       List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {  // Check if there are at least 4 bytes readable
            int n = in.readInt();
            System.err.println("ByteToIntegerDecoder decode msg is " + n);
            out.add(n);      //Read integer from inbound ByteBuf, add to the List of decodec messages
        }
    }
}

编码器

将我们系统处理完的信息,编码成,二进制,传出,给调用者

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
            throws Exception {
        System.err.println("IntegerToByteEncoder encode msg is " + msg);
        out.writeInt(msg);
    }
}

解码后的数据怎么使用

对于加密后的数据,可以直接强制转换为我们解码的对象

public class BusinessHandler extends ChannelInboundHandlerAdapter {
    private Logger  logger  = LoggerFactory.getLogger(BusinessHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //因为我们的解码其中指定是Int类型,所以我们就可以,强制转换为Int,这里为了好理解,假如我们的解码器,中是转换了Person,那么在我们的处理器中就,可以强制换换为Person
        Person person = (Person) msg;
        logger.info("BusinessHandler read msg from client :" + person);
    }

上一篇下一篇

猜你喜欢

热点阅读