Netty程序员技术干货

Netty 源码深度解析(九) - 编码

2018-11-29  本文已影响54人  紫霞等了至尊宝五百年

概述

一个问题



编码器实现了ChannelOutboundHandler,并将出站数据从 一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:

1 抽象类 MessageToByteEncoder

MessageToByteEncoder API
解码器通常需要在Channel关闭之后产生最后一个消息(因此也就有了 decodeLast()方法)
这显然不适于编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的

1.1 ShortToByteEncoder

其接受一Short 型实例作为消息,编码为Short的原子类型值,并写入ByteBuf,随后转发给ChannelPipeline中的下一个 ChannelOutboundHandler
每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节

ShortToByteEncoder

1.2 Encoder

Netty 提供了一些专门化的 MessageToByteEncoder,可基于此实现自己的编码器
WebSocket08FrameEncoder类提供了一个很好的实例

2 抽象类 MessageToMessageEncoder

你已经看到了如何将入站数据从一种消息格式解码为另一种
为了完善这幅图,将展示 对于出站数据将如何从一种消息编码为另一种。MessageToMessageEncoder类的 encode()方法提供了这种能力

MessageToMessageEncoderAPI
为了演示,使用IntegerToStringEncoder 扩展了 MessageToMessageEncoder

关于有趣的 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler. codec.protobuf.ProtobufEncoder类,它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式。

一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

pipeline中的标准链表结构
java对象编码过程
write:写队列
flush:刷新写队列
writeAndFlush: 写队列并刷新

pipeline中的标准链表结构

标准的pipeline链式结构
数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler处理,调用write,将结果对象写出去
而写的过程先通过tail节点,然后通过encoder节点将对象编码成ByteBuf,最后将该ByteBuf对象传递到head节点,调用底层的Unsafe写到JDK底层管道

Java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的ByteBuf,写到管道里?

我们先看下调用write的code


业务处理器接受到请求之后,做一些业务处理,返回一个user

落到 Encoder节点,下面是 Encoder 的处理流程


按照简单自定义协议,将Java对象 User 写到传入的参数 out中,这个out到底是什么?

需知User对象,从BizHandler传入到 MessageToByteEncoder时,首先传到 write

1. 判断当前Handelr是否能处理写入的消息(匹配对象)



2 分配内存


3 编码实现

4 释放对象

5 传播数据

//112 如果buf中写入了数据,就把buf传到下一个节点,直到 header 节点


6 释放内存

//115 否则,释放buf,将空数据传到下一个节点
// 120 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
// 127 当buf在pipeline中处理完之后,释放

Encoder处理传入的Java对象

总结就是,Encoder节点分配一个ByteBuf,调用encode方法,将Java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点,在我们的例子中,最终会传入到head节点

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象

write - 写buffer队列


ChannelOutboundInvoker#

write(Object msg, boolean flush, ChannelPromise promise)



HeadContext in DefaultChannelPipeline#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
Unsafe in Channel#write(Object msg, ChannelPromise promise)

以下过程分三步讲解


direct ByteBuf


AbstractChannel#filterOutboundMessage(Object msg)

插入写队列

想要理解上面这段代码,须掌握写缓存中的几个消息指针



ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise下面分别是

三个指针的作用

图解过程

可得,调用n次addMessage

设置写状态

ChannelOutboundBuffer#addMessage
ChannelConfig#getWriteBufferHighWaterMark()

flush:刷新buffer队列

添加刷新标志并设置写状态

遍历 buffer 队列,过滤bytebuf

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋锁迭代次数
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,将当前节点写出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}
image.png
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
    @Override
    public void operationComplete(Future<? super Void> future) throws Exception {
       // 回调 
    }
})

最后,调用 recycle,将当前节点回收

writeAndFlush: 写队列并刷新

writeAndFlush在某个Handler中被调用之后,最终会落到 TailContext节点


public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

AbstractChannelHandlerContext#
AbstractChannelHandlerContext#

最终,通过一个boolean变量,表示是调用invokeWriteAndFlush,还是invokeWriteinvokeWrite便是我们上文中的write过程

AbstractChannelHandlerContext#
可以看到,最终调用的底层方法和单独调用writeflush一样的


由此看来,invokeWriteAndFlush基本等价于write之后再来一次flush

总结

当 BizHandler 通过 writeAndFlush 方法将自定义对象往前传播时,其实可以拆分成两个过程

上一篇下一篇

猜你喜欢

热点阅读