Netty HTTP协议实现解析

2018-11-09  本文已影响0人  雕兄L

http协议的原理参见文章:https://blog.csdn.net/xiangzhihong8/article/details/52029446

http协议的解码和编码主要由HttpObjectDecoder完成,继承自ByteToMessageDecoder
解析过程先从ByteToMessageDecoder看起吧,这个类是一个公共用的解码类,实现了拆包粘包的拼接大逻辑,因为不仅用于http协议解码,所以要看懂数据流转逻辑,还得把整个解码过程看完才能理解;并且请求行、 请求头和请求正文解码完之后,还涉及聚合过程,把聚合handler的逻辑看完才能完全理顺思路。
HttpObjectAggregator分析参见https://www.jianshu.com/p/9427033ee0c9
先看看ByteToMessageDecoder的channelRead方法实现

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    //cumulation是一个成员变量,其中可能存有上一次channelRead()没有解析完的数据
                    //这里把新读取到的字节merge到cumulation中
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //执行解码过程
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                //这段代码的意思要把整个解码过程看完之后才能理解
                //cumulation 存储了handler本次待解码的数据,如果cumulation是不可读取的,也就是readIndex=writeIndex,即表示其中的数据已经被解码完了,这种情况下把这个buffer的数据释放
                //本人debug过程中遇到的都是解析完了的数据,拆包或者粘包的时候可能会导致只解析部分数据,这段代码就会触发
                //比如解析header过程中,如果数据不完整,则应该会触发
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    //比如widx=8192,ridx=8000,则表示数据没有解码完,则把已经读取的字节丢弃,保留剩下的192字节
                    //这里是当多次read都还有剩余字节时才触发,这里看discardAfterReads默认是16
                    //如果一次进来8192*2 字节数据,16次后是8192*32=262144=256k,不释放已经解析完的字节确很损耗内存
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                //正常解析完,把数据传递到一下个handler
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

其中callDecode代码逻辑

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            //当buffer一次循环没有被读取完时,比如in有8192*2数据,一次循环读取了8192字节,这里再一次循环读取
            //具体情况可看下面的关于HttpObjectDecoder里的decode方法里关于content的解析过程,结合起来看就能明白
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    //把读取到的数据往下一个handler传递
                    //这里即传递到了HttpObjectAggregator里
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                //子类执行具体解码过程 即根据协议从in里面读取数据,并修改readIndex的值
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                //这里表示没有解码出相关对象
                if (outSize == out.size()) {
                    //一个字节也没有读取,结束解码过程,这里代码的意义需要回看channelRead()里的逻辑,即cumulation的处理过程
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
                //已经解码出对象了,但是居然没有读取过字节,抛出异常
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

下面再看下子类的decode方法

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        if (resetRequested) {
            resetNow();
        }

        //通过currentState来标志解析到哪一步
        switch (currentState) {
        case SKIP_CONTROL_CHARS: {
            if (!skipControlCharacters(buffer)) {
                return;
            }
            currentState = State.READ_INITIAL;
        }
        case READ_INITIAL: try {
            //读取字节流,把消息行解析成字符串,AppendableCharSequence 是netty自己封装的功能类似String的对象
            //注意line这个对象和String不一样,是一个可变对象,即在别的地方的修改会同步修改其中内容
            AppendableCharSequence line = lineParser.parse(buffer);
            if (line == null) {
                return;
            }
            String[] initialLine = splitInitialLine(line);
            if (initialLine.length < 3) {
                // Invalid initial line - ignore.
                currentState = State.SKIP_CONTROL_CHARS;
                return;
            }
            //这里创建HttpMessage对象,如果是request,则创建的是DefaultHttpRequest对象
            message = createMessage(initialLine);
            //修改状态,已经读取完消息行了,应该读取消息头的字节
            currentState = State.READ_HEADER;
            // fall-through
        } catch (Exception e) {
            out.add(invalidMessage(buffer, e));
            return;
        }
        case READ_HEADER: try {
            //执行读取消息头,并返回读取标志
            State nextState = readHeaders(buffer);
            if (nextState == null) {
                return;
            }
            //咋们假设状态是READ_FIXED_LENGTH_CONTENT,接着往下看
            currentState = nextState;
            switch (nextState) {
            case SKIP_CONTROL_CHARS:
                // fast-path
                // No content is expected.
                out.add(message);
                out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                resetNow();
                return;
            case READ_CHUNK_SIZE:
                if (!chunkedSupported) {
                    throw new IllegalArgumentException("Chunked messages not supported");
                }
                // Chunked encoding - generate HttpMessage first.  HttpChunks will follow.
                out.add(message);
                return;
            default:
                /**
                 * <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a
                 * request does not have either a transfer-encoding or a content-length header then the message body
                 * length is 0. However for a response the body length is the number of octets received prior to the
                 * server closing the connection. So we treat this as variable length chunked encoding.
                 */
                long contentLength = contentLength();
                if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
                    out.add(message);
                    out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                    resetNow();
                    return;
                }

                assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
                        nextState == State.READ_VARIABLE_LENGTH_CONTENT;

                //执行到这里消息行和消息头数据已经解析出来了
                //这里的message即DefaultHttpRequest(DefaultHttpResponse)
                //还有消息体(DefaultHttpContent)的数据没有解析
                out.add(message);

                if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
                    // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
                    //赋值chunkSize,这个value是消息头里传递过来的消息体的数据长度
                    //接下来执行body解析会用到这个值,这个chunkSize是成员变量
                    chunkSize = contentLength;
                }
                //这里强制返回,消息体需要第二次循环来解析
                //下一次循环再进来,就直接到 READ_FIXED_LENGTH_CONTENT or READ_CHUNK_SIZE,不再进入header解析,
                // 因为currentState 是一个成员变量,第二次ChannelRead()读取,依旧可以解析到该handler的变量值
                // We return here, this forces decode to be called again where we will decode the content
                return;
            }
        } catch (Exception e) {
            out.add(invalidMessage(buffer, e));
            return;
        }
        case READ_VARIABLE_LENGTH_CONTENT: {
            // Keep reading data as a chunk until the end of connection is reached.
            int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
            if (toRead > 0) {
                ByteBuf content = buffer.readRetainedSlice(toRead);
                out.add(new DefaultHttpContent(content));
            }
            return;
        }
        //根据消息头给的content-length长度进行解析的方式
        case READ_FIXED_LENGTH_CONTENT: {
            int readLimit = buffer.readableBytes();

            // Check if the buffer is readable first as we use the readable byte count
            // to create the HttpChunk. This is needed as otherwise we may end up with
            // create a HttpChunk instance that contains an empty buffer and so is
            // handled like it is the last HttpChunk.
            //
            // See https://github.com/netty/netty/issues/433
            if (readLimit == 0) {
                return;
            }
            //获取本次需要读取的自己数量,maxChunkSize默认是8192字节
            //debug发现,body数据量比较大的时候,channel一次read到8192*2字节的数据,
            // 为了方便分析,这里假设buffer里有8192*2字节,toRead=maxChunkSize=8192
            //所以这里一次读取不完
            int toRead = Math.min(readLimit, maxChunkSize);
            if (toRead > chunkSize) {
                toRead = (int) chunkSize;
            }
            //目前buffer的ridx=0,widx=16384
            //这里是获取一个独立的切片副本,即获取到的content是一个ridx=0,widx=8192的buffer
            ByteBuf content = buffer.readRetainedSlice(toRead);
            //buffer的ridx变为8192

            //chunkSize减掉8192
            chunkSize -= toRead;

            if (chunkSize == 0) {
                // Read all content.
                //如果chunkSize=0 则把LastHttpContent传递出去,
                // HttpObjectAggregator在聚合过程中会根据这个对象判断是否结束聚合
                out.add(new DefaultLastHttpContent(content, validateHeaders));
                resetNow();
            } else {
                //获取DefaultHttpContent往下传递
                //这个时候代码逻辑回到callDecode()方法里继续进行分析
                out.add(new DefaultHttpContent(content));
            }
            return;
        }
        //当request的header里有Transfer-Encoding: chunked 则会执行到下面的代码,chunck协议的实现不再解析,参见协议说明即可看懂代码
        /**
         * everything else after this point takes care of reading chunked content. basically, read chunk size,
         * read chunk, read and ignore the CRLF and repeat until 0
         */
        case READ_CHUNK_SIZE: try {
            AppendableCharSequence line = lineParser.parse(buffer);
            if (line == null) {
                return;
            }
            int chunkSize = getChunkSize(line.toString());
            this.chunkSize = chunkSize;
            if (chunkSize == 0) {
                currentState = State.READ_CHUNK_FOOTER;
                return;
            }
            currentState = State.READ_CHUNKED_CONTENT;
            // fall-through
        } catch (Exception e) {
            out.add(invalidChunk(buffer, e));
            return;
        }
        case READ_CHUNKED_CONTENT: {
            assert chunkSize <= Integer.MAX_VALUE;
            int toRead = Math.min((int) chunkSize, maxChunkSize);
            toRead = Math.min(toRead, buffer.readableBytes());
            if (toRead == 0) {
                return;
            }
            HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
            chunkSize -= toRead;

            out.add(chunk);

            if (chunkSize != 0) {
                return;
            }
            currentState = State.READ_CHUNK_DELIMITER;
            // fall-through
        }
        case READ_CHUNK_DELIMITER: {
            final int wIdx = buffer.writerIndex();
            int rIdx = buffer.readerIndex();
            while (wIdx > rIdx) {
                byte next = buffer.getByte(rIdx++);
                if (next == HttpConstants.LF) {
                    currentState = State.READ_CHUNK_SIZE;
                    break;
                }
            }
            buffer.readerIndex(rIdx);
            return;
        }
        case READ_CHUNK_FOOTER: try {
            LastHttpContent trailer = readTrailingHeaders(buffer);
            if (trailer == null) {
                return;
            }
            out.add(trailer);
            resetNow();
            return;
        } catch (Exception e) {
            out.add(invalidChunk(buffer, e));
            return;
        }
        case BAD_MESSAGE: {
            // Keep discarding until disconnection.
            buffer.skipBytes(buffer.readableBytes());
            break;
        }
        case UPGRADED: {
            int readableBytes = buffer.readableBytes();
            if (readableBytes > 0) {
                // Keep on consuming as otherwise we may trigger an DecoderException,
                // other handler will replace this codec with the upgraded protocol codec to
                // take the traffic over at some point then.
                // See https://github.com/netty/netty/issues/2173
                out.add(buffer.readBytes(readableBytes));
            }
            break;
        }
        }
    }

再单独看下解析header的代码吧

private static class HeaderParser implements ByteProcessor {
        private final AppendableCharSequence seq;
        private final int maxLength;
        private int size;

        HeaderParser(AppendableCharSequence seq, int maxLength) {
            this.seq = seq;
            this.maxLength = maxLength;
        }

        public AppendableCharSequence parse(ByteBuf buffer) {
            final int oldSize = size;
            seq.reset();
          //查找CRLF结束符的关键代码
            int i = buffer.forEachByte(this);
            if (i == -1) {
                size = oldSize;
                return null;
            }
            buffer.readerIndex(i + 1);
            return seq;
        }

        public void reset() {
            size = 0;
        }

        @Override
        public boolean process(byte value) throws Exception {
            char nextByte = (char) (value & 0xFF);
          //查找关键代码
            if (nextByte == HttpConstants.CR) {
                return true;
            }
            if (nextByte == HttpConstants.LF) {
                return false;
            }

            if (++ size > maxLength) {
                // TODO: Respond with Bad Request and discard the traffic
                //    or close the connection.
                //       No need to notify the upstream handlers - just log.
                //       If decoding a response, just throw an exception.
                throw newException(maxLength);
            }
              //把字节一个个加入字符串
            seq.append(nextByte);
            return true;
        }

        protected TooLongFrameException newException(int maxLength) {
            return new TooLongFrameException("HTTP header is larger than " + maxLength + " bytes.");
        }
    }

每个header的是以CRLF字符为结束标志符号,这里buffer.forEachBytes(this)是核心的代码,forEachBytes(this)可以接受实现了ByteProcessor的类。每次用bytebuf中取出一个byte传入process方法,这里就是通过this传入的HeaderParser类中的process方法。反复迭代直到process方法返回false为止,这里可以看到,遇到CR时虽然返回true但是不将其加入最后传出的字符串(seq),直到遇到LF表示迭代结束,将seq返回。

上一篇 下一篇

猜你喜欢

热点阅读