Netty原理与基础(五)

2020-10-25  本文已影响0人  smallmartial

1.Decoder原理

1.1什么叫作Netty的解码器呢?

首先,它是一个InBound入站处理器,解码器负责处理“入站数据”。其次,它能将上一站Inbound入站处理器传过来的输入(Input)数据,进行数据的解码或者格式转换,然后输出(Output)到下一站Inbound入站处理器。一个标准的解码器将输入类型为ByteBuf缓冲区的数据进行解码,输出一个一个的Java POJO对象。Netty内置了这个解码器,叫作ByteToMessageDecoder,位在Netty的io.netty.handler.codec包中。


image.png

1.2代码示例

//解码
public class Byte2IntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
                       List<Object> out) {
        while (in.readableBytes() >= 4) {
            int i = in.readInt();
            Logger.info("解码出一个整数: " + i);
            out.add(i);
        }
    }
}
//处理程序
public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Integer integer = (Integer) msg;
        Logger.info("打印出一个整数: " + integer);
    }
}
//测试类
public class Byte2IntegerDecoderTester {
    /**
     * 整数解码器的使用实例
     */
    @Test
    public void testByteToIntegerDecoder() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            protected void initChannel(EmbeddedChannel ch) {
                ch.pipeline().addLast(new Byte2IntegerDecoder());
                ch.pipeline().addLast(new IntegerProcessHandler());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);

        for (int j = 0; j < 100; j++) {
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(j);
            channel.writeInbound(buf);
        }

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

基类ByteToMessageDecoder负责解码器的ByteBuf缓冲区的释放工作,它会调用ReferenceCountUtil.release(in)方法,将之前的ByteBuf缓冲区的引用数减1。

1.3 ReplayingDecoder解码器

image.png

ReplayingDecoder类是ByteToMessageDecoder的子类。其作用是:
· 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。
· 若ByteBuf中有足够的字节,则会正常读取;反之,如果没有足够的字节,则会停止解码。

1.4整数分包解码器

可以使用ReplayingDecoder来解决
要完成以上的例子,需要用到ReplayingDecoder一个很重要的属性——state成员属性。该成员属性的作用就是保存当前解码器在解码过程中的当前阶段

    protected ReplayingDecoder() {
        this((Object)null);
    }

    protected ReplayingDecoder(S initialState) {
        this.replayable = new ReplayingDecoderByteBuf();
        this.checkpoint = -1;
        this.state = initialState;
    }

    protected void checkpoint() {
        this.checkpoint = this.internalBuffer().readerIndex();
    }

    protected void checkpoint(S state) {
        this.checkpoint();
        this.state(state);
    }

1.5分包解码器

在原理上,字符串分包解码和整数分包解码是一样的。有所不同的是:整数的长度是固定的,目前在Java中是4个字节;而字符串的长度不是固定的,是可变长度的,这就是一个小小的难题

public class StringReplayDecoder
        extends ReplayingDecoder<StringReplayDecoder.Status> {

    enum Status {
        PARSE_1, PARSE_2
    }

    private int length;
    private byte[] inBytes;

    public StringReplayDecoder() {
        //构造函数中,需要初始化父类的state 属性,表示当前阶段
        super(Status.PARSE_1);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {


        switch (state()) {
            case PARSE_1:
                //第一步,从装饰器ByteBuf 中读取长度
                length = in.readInt();
                inBytes = new byte[length];
                // 进入第二步,读取内容
                // 并且设置“读指针断点”为当前的读取位置
                checkpoint(Status.PARSE_2);
                break;
            case PARSE_2:
                //第二步,从装饰器ByteBuf 中读取内容数组
                in.readBytes(inBytes, 0, length);
                out.add(new String(inBytes, "UTF-8"));
                // 第二步解析成功,
                // 进入第一步,读取下一个字符串的长度
                // 并且设置“读指针断点”为当前的读取位置
                checkpoint(Status.PARSE_1);
                break;
            default:
                break;
        }

    }

public class StringProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String s = (String) msg;
        System.out.println("打印: " + s);
    }
}

public class StringReplayDecoderTester {
    static String content = "smallmartial:Netty知识学习";

    /**
     * 字符串解码器的使用实例
     */
    @Test
    public void testStringReplayDecoder() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            protected void initChannel(EmbeddedChannel ch) {
                ch.pipeline().addLast(new StringReplayDecoder());
                ch.pipeline().addLast(new StringProcessHandler());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);
        byte[] bytes = content.getBytes(Charset.forName("utf-8"));
        for (int j = 0; j < 100; j++) {
            //1-3之间的随机数
            int random = RandomUtil.randInMod(3);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(bytes.length * random);
            for (int k = 0; k < random; k++) {
                buf.writeBytes(bytes);
            }
            channel.writeInbound(buf);
        }
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
image.png

1.6MessageToMessageDecoder解码器

MessageToMessageDecoder<I>。在继承它的时候,需要明确的泛型实参<I>。这个实参的作用就是指定入站消息JavaPOJO类型。

上一篇 下一篇

猜你喜欢

热点阅读