Netty http2 写请求之Header编码

2018-10-28  本文已影响0人  绝尘驹

我们知道,HTTP2.0使用header桢表达HTTP header+request line,data frame表达Body。header桢和data桢使用相同的stream id组成一个完整的HTTP请求/响应包,这里先描述netty是怎么把httprequest请求创建一个header frame的。

http2 Stream 和 header frame

每一次http2请求都会对应的创建一个stream,发送header frame,即header field 的数据,如果field数据大于frame的最大值,则需要通过一个header frame + 多个CONTINUATION Frame 来发送,最后一个CONTINUATION Frame 是通过把endOfHeaders设置为true来告诉服务端,说header 部分已经发完了, 下面我们看下这个大概的实现。

Http2 是通过Frame的格式来和服务端交互的,我们在写代码时还时可以用http1的格式来发生http2的请求,这是因为netty自动对http1的httpRequest 做了编码转换。

Netty http2 写请求由如下handler实现,

HttpToHttp2ConnectionHandler->DefaultHttp2ConnectionEncoder->DefaultHttp2FrameWriter

http 转换为http2 Frame

http request 转换为http2 Frame 主要由
HttpToHttp2ConnectionHandler 实现,也是pipleline里的第一个handler,它的write方法如下:

public void write(ChannelHandlerContext ctx, Object msg,      ChannelPromise promise) {
     //这里是判断请求是否为Http请求
    if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) {
        ctx.write(msg, promise);
        return;
    }

    boolean release = true;
    SimpleChannelPromiseAggregator promiseAggregator =
            new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
    try {
        Http2ConnectionEncoder encoder = encoder();
        boolean endStream = false;

        //每一个httprquest都是HttpMessage,
        if (msg instanceof HttpMessage) {
            final HttpMessage httpMsg = (HttpMessage) msg;
            //streamId,我们可以自己在header里指定,没有指定的话,会自动生成,客户端是基数递增的格式
            // Provide the user the opportunity to specify the streamId
            currentStreamId = getStreamId(httpMsg.headers());
            //对headers编码为http2 header frame,同时发送给服务端,即一个请求,服务端先收到的是header frame,后面才是数据frame
            // Convert and write the headers,
            Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders);
              //如果没有请求body,那endStream为true,即该请求读完header frame就结束了,即比如get,head等endStream都为true。
            endStream = msg instanceof FullHttpMessage && !((FullHttpMessage) msg).content().isReadable();
            //发送header frame
            writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers,
                    endStream, promiseAggregator);
        }

        if (!endStream && msg instanceof HttpContent) {
            boolean isLastContent = false;
            HttpHeaders trailers = EmptyHttpHeaders.INSTANCE;
            Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE;
            if (msg instanceof LastHttpContent) {
                isLastContent = true;

                // Convert any trailing headers.
                final LastHttpContent lastContent = (LastHttpContent) msg;
                trailers = lastContent.trailingHeaders();
                http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders);
            }

            // Write the data,发送数据frame
            final ByteBuf content = ((HttpContent) msg).content();
           
            endStream = isLastContent && trailers.isEmpty();
            release = false;
            encoder.writeData(ctx, currentStreamId, content, 0, endStream, promiseAggregator.newPromise());

            if (!trailers.isEmpty()) {
                // Write trailing headers.
                writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true, promiseAggregator);
            }
        }
    } catch (Throwable t) {
        onError(ctx, true, t);
        promiseAggregator.setFailure(t);
    } finally {
        if (release) {
            ReferenceCountUtil.release(msg);
        }
        promiseAggregator.doneAllocatingPromises();
    }

通过上面我们可以看出,一次http request 请求,http2 会分别对http header 和body编码,header 编码为http2 header frame,body 编码为http2 data frame,并先发送header frame下面我们看下http2 header 的编码。

创建http2Header

public static Http2Headers toHttp2Headers(HttpMessage in, boolean validateHeaders) {
    HttpHeaders inHeaders = in.headers();
    final Http2Headers out = new DefaultHttp2Headers(validateHeaders, inHeaders.size());
    if (in instanceof HttpRequest) {
        HttpRequest request = (HttpRequest) in;
        URI requestTargetUri = URI.create(request.uri());
        out.path(toHttp2Path(requestTargetUri));
        out.method(request.method().asciiName());
        setHttp2Scheme(inHeaders, requestTargetUri, out);

        if (!isOriginForm(requestTargetUri) && !isAsteriskForm(requestTargetUri)) {
            // Attempt to take from HOST header before taking from the request-line
            String host = inHeaders.getAsString(HttpHeaderNames.HOST);
            setHttp2Authority((host == null || host.isEmpty()) ? requestTargetUri.getAuthority() : host, out);
        }
    } else if (in instanceof HttpResponse) {
        HttpResponse response = (HttpResponse) in;
        out.status(response.status().codeAsText());
    }

    // Add the HTTP headers which have not been consumed above
    toHttp2Headers(inHeaders, out);
    return out;
}

DefaultHttp2Headers 里面有一个HeaderEntry 数组,对每个header的name取hash,再和数组的长度进行&运算得到数组的索引下标,类似hashmap的原理,顺便说下,http1的请求行信息,url,method,status也做为key,value写到header里,通过以:开头,格式如下:
:method,:path 这样的。

编码 http2Header

http2 编码header时,会检查是否有对应的http2Stream,http2时多个请求跑在一个底层物理链接上,通过streamid 来区分不同的请求,所以在发请求时,会检查stream,这里以没有压缩的请求分析,压缩的以后单独分析,代码如下:

       
        Http2Stream stream = connection.stream(streamId);
        //写http header frame时,stream应该是空的,为啥事空的呢,因为每次请求streamid都是递增的,即会创建,新建的stream是open状态的。
        if (stream == null) {
                try {
                    //一般的请求,发送header frame时创建的stream事open状态,除非
                    stream = connection.local().createStream(streamId, endOfStream);
                } catch (Http2Exception cause) {
                    if (connection.remote().mayHaveCreatedStream(streamId)) {
                        promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
                        return promise;
                    }
                    throw cause;
                }
            } else {
                switch (stream.state()) {
                    case RESERVED_LOCAL:
                        stream.open(endOfStream);
                        break;
                    case OPEN:
                    case HALF_CLOSED_REMOTE:
                        // Allowed sending headers in these states.
                        break;
                    default:
                        throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
                                                        stream.state());
           }
      }

发送header

        Http2RemoteFlowController flowController = flowController();
         //如果是post请求,endOfStream是false,这里只是发header,如果是get请求,endOfStream是true,则检查流控是否有
        // 等待需要发的,如果有等待,则需要添加到队列,如果没有
        // 则直接发。
        if (!endOfStream || !flowController.hasFlowControlled(stream)) {
            boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
            if (endOfStream) {
                final Http2Stream finalStream = stream;
                final ChannelFutureListener closeStreamLocalListener = new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        lifecycleManager.closeStreamLocal(finalStream, future);
                    }
                };
                promise = promise.unvoid().addListener(closeStreamLocalListener);
            }

             // 这里是真正的codec
            ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
                                                            weight, exclusive, padding, endOfStream, promise);
            // Writing headers may fail during the encode state if they violate HPACK limits.
            Throwable failureCause = future.cause();
            if (failureCause == null) {
                // Synchronously set the headersSent flag to ensure that we do not subsequently write
                // other headers containing pseudo-header fields.
                //
                // This just sets internal stream state which is used elsewhere in the codec and doesn't
                // necessarily mean the write will complete successfully.
                stream.headersSent(isInformational);

                if (!future.isSuccess()) {
                    // Either the future is not done or failed in the meantime.
                    notifyLifecycleManagerOnError(future, ctx);
                }
            } else {
                lifecycleManager.onError(ctx, true, failureCause);
            }

            return future;
        } else {
            // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
            // 如果流控队列还有数据在等,则继续添加到等待队列。
            flowController.addFlowControlled(stream,
                    new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
                                              true, promise));
            return promise;
        }

流控等待队列,如果是发送请求的header frame部分,而且有body部分,则不需要进入流控队列,header frame 可以直接发出去,如果进入队列了,那写就到这里结束了,后面通过流控队列里取出来发出去的。

codec http2 header 就时按照http2 的header frame的格式编码。

        verifyStreamId(streamId, STREAM_ID);
        if (hasPriority) {
            verifyStreamOrConnectionId(streamDependency, STREAM_DEPENDENCY);
            verifyPadding(padding);
            verifyWeight(weight);
        }

        // Encode the entire header block.
        headerBlock = ctx.alloc().buffer();
        headersEncoder.encodeHeaders(streamId, headers, headerBlock);

        Http2Flags flags =
                new Http2Flags().endOfStream(endStream).priorityPresent(hasPriority).paddingPresent(padding > 0);

        // Read the first fragment (possibly everything).
        int nonFragmentBytes = padding + flags.getNumPriorityBytes();
        int maxFragmentLength = maxFrameSize - nonFragmentBytes;
        ByteBuf fragment = headerBlock.readRetainedSlice(min(headerBlock.readableBytes(), maxFragmentLength));

        // Set the end of headers flag for the first frame.
        flags.endOfHeaders(!headerBlock.isReadable());

        int payloadLength = fragment.readableBytes() + nonFragmentBytes;
        ByteBuf buf = ctx.alloc().buffer(HEADERS_FRAME_HEADER_LENGTH);
        writeFrameHeaderInternal(buf, payloadLength, HEADERS, flags, streamId);
        writePaddingLength(buf, padding);

        if (hasPriority) {
            buf.writeInt(exclusive ? (int) (0x80000000L | streamDependency) : streamDependency);

            // Adjust the weight so that it fits into a single byte on the wire.
            buf.writeByte(weight - 1);
        }
        ctx.write(buf, promiseAggregator.newPromise());

        // Write the first fragment.
        ctx.write(fragment, promiseAggregator.newPromise());

        // Write out the padding, if any.
        if (paddingBytes(padding) > 0) {
            ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding)), promiseAggregator.newPromise());
        }

        if (!flags.endOfHeaders()) {
            writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator);
        }

编码header

public void encodeHeaders(int streamId, Http2Headers headers,   ByteBuf buffer) throws Http2Exception {
    try {
        // If there was a change in the table size, serialize the output from the hpackEncoder
        // resulting from that change.
        if (tableSizeChangeOutput.isReadable()) {
            buffer.writeBytes(tableSizeChangeOutput);
            tableSizeChangeOutput.clear();
        }
        //hpackEncoder 主要是对header压缩
        hpackEncoder.encodeHeaders(streamId, buffer, headers, sensitivityDetector);
    } catch (Http2Exception e) {
        throw e;
    } catch (Throwable t) {
        throw connectionError(COMPRESSION_ERROR, t, "Failed encoding headers block: %s", t.getMessage());
    }
}

hpackEncoder 对header的编码主要作用是减少header头的传输,我们知道http1每次请求都带了很多头,即使是没有用的,每次都传,很浪费带宽,所以http2对对header传输这快做了彻底的优化,优化的措施是,hpackEncoder 提供两张表,一张静态的HpackStaticTable,一张动态的。静态的表包含了http 61个常用的http header 头,每个都一个固定的索引值,要传输的header在静态表里,而且对应value也相等,那么传输的只是一个对应的索引。如果静态表里没有对应的key,比如我们自己定义的key,这时就用上动态表了,先到动态表里找,找到也是用索引,如果没有,则把对应的key和value添加到动态表,本次还是发具体的key和value,待下一次请求就可以用上了。

发现写了这么多,还只是把header 部分的编码写完,写太长没有人看,发送数据部分下次单独做为一篇文章写。

上一篇 下一篇

猜你喜欢

热点阅读