Netty HTTP协议实现解析
2018-11-09 本文已影响0人
雕兄L
http协议的原理参见文章:https://blog.csdn.net/xiangzhihong8/article/details/52029446
http协议的解码和编码主要由HttpObjectDecoder
完成,继承自ByteToMessageDecoder
解析过程先从ByteToMessageDecoder
看起吧,这个类是一个公共用的解码类,实现了拆包粘包的拼接大逻辑,因为不仅用于http协议解码,所以要看懂数据流转逻辑,还得把整个解码过程看完才能理解;并且请求行、 请求头和请求正文解码完之后,还涉及聚合过程,把聚合handler的逻辑看完才能完全理顺思路。
HttpObjectAggregator
分析参见https://www.jianshu.com/p/9427033ee0c9
先看看ByteToMessageDecoder的channelRead
方法实现
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
//cumulation是一个成员变量,其中可能存有上一次channelRead()没有解析完的数据
//这里把新读取到的字节merge到cumulation中
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//执行解码过程
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
//这段代码的意思要把整个解码过程看完之后才能理解
//cumulation 存储了handler本次待解码的数据,如果cumulation是不可读取的,也就是readIndex=writeIndex,即表示其中的数据已经被解码完了,这种情况下把这个buffer的数据释放
//本人debug过程中遇到的都是解析完了的数据,拆包或者粘包的时候可能会导致只解析部分数据,这段代码就会触发
//比如解析header过程中,如果数据不完整,则应该会触发
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
//比如widx=8192,ridx=8000,则表示数据没有解码完,则把已经读取的字节丢弃,保留剩下的192字节
//这里是当多次read都还有剩余字节时才触发,这里看discardAfterReads默认是16
//如果一次进来8192*2 字节数据,16次后是8192*32=262144=256k,不释放已经解析完的字节确很损耗内存
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//正常解析完,把数据传递到一下个handler
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
其中callDecode
代码逻辑
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//当buffer一次循环没有被读取完时,比如in有8192*2数据,一次循环读取了8192字节,这里再一次循环读取
//具体情况可看下面的关于HttpObjectDecoder里的decode方法里关于content的解析过程,结合起来看就能明白
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
//把读取到的数据往下一个handler传递
//这里即传递到了HttpObjectAggregator里
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//子类执行具体解码过程 即根据协议从in里面读取数据,并修改readIndex的值
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
//这里表示没有解码出相关对象
if (outSize == out.size()) {
//一个字节也没有读取,结束解码过程,这里代码的意义需要回看channelRead()里的逻辑,即cumulation的处理过程
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
//已经解码出对象了,但是居然没有读取过字节,抛出异常
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
下面再看下子类的decode方法
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (resetRequested) {
resetNow();
}
//通过currentState来标志解析到哪一步
switch (currentState) {
case SKIP_CONTROL_CHARS: {
if (!skipControlCharacters(buffer)) {
return;
}
currentState = State.READ_INITIAL;
}
case READ_INITIAL: try {
//读取字节流,把消息行解析成字符串,AppendableCharSequence 是netty自己封装的功能类似String的对象
//注意line这个对象和String不一样,是一个可变对象,即在别的地方的修改会同步修改其中内容
AppendableCharSequence line = lineParser.parse(buffer);
if (line == null) {
return;
}
String[] initialLine = splitInitialLine(line);
if (initialLine.length < 3) {
// Invalid initial line - ignore.
currentState = State.SKIP_CONTROL_CHARS;
return;
}
//这里创建HttpMessage对象,如果是request,则创建的是DefaultHttpRequest对象
message = createMessage(initialLine);
//修改状态,已经读取完消息行了,应该读取消息头的字节
currentState = State.READ_HEADER;
// fall-through
} catch (Exception e) {
out.add(invalidMessage(buffer, e));
return;
}
case READ_HEADER: try {
//执行读取消息头,并返回读取标志
State nextState = readHeaders(buffer);
if (nextState == null) {
return;
}
//咋们假设状态是READ_FIXED_LENGTH_CONTENT,接着往下看
currentState = nextState;
switch (nextState) {
case SKIP_CONTROL_CHARS:
// fast-path
// No content is expected.
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
case READ_CHUNK_SIZE:
if (!chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
out.add(message);
return;
default:
/**
* <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a
* request does not have either a transfer-encoding or a content-length header then the message body
* length is 0. However for a response the body length is the number of octets received prior to the
* server closing the connection. So we treat this as variable length chunked encoding.
*/
long contentLength = contentLength();
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
}
assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;
//执行到这里消息行和消息头数据已经解析出来了
//这里的message即DefaultHttpRequest(DefaultHttpResponse)
//还有消息体(DefaultHttpContent)的数据没有解析
out.add(message);
if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
//赋值chunkSize,这个value是消息头里传递过来的消息体的数据长度
//接下来执行body解析会用到这个值,这个chunkSize是成员变量
chunkSize = contentLength;
}
//这里强制返回,消息体需要第二次循环来解析
//下一次循环再进来,就直接到 READ_FIXED_LENGTH_CONTENT or READ_CHUNK_SIZE,不再进入header解析,
// 因为currentState 是一个成员变量,第二次ChannelRead()读取,依旧可以解析到该handler的变量值
// We return here, this forces decode to be called again where we will decode the content
return;
}
} catch (Exception e) {
out.add(invalidMessage(buffer, e));
return;
}
case READ_VARIABLE_LENGTH_CONTENT: {
// Keep reading data as a chunk until the end of connection is reached.
int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
if (toRead > 0) {
ByteBuf content = buffer.readRetainedSlice(toRead);
out.add(new DefaultHttpContent(content));
}
return;
}
//根据消息头给的content-length长度进行解析的方式
case READ_FIXED_LENGTH_CONTENT: {
int readLimit = buffer.readableBytes();
// Check if the buffer is readable first as we use the readable byte count
// to create the HttpChunk. This is needed as otherwise we may end up with
// create a HttpChunk instance that contains an empty buffer and so is
// handled like it is the last HttpChunk.
//
// See https://github.com/netty/netty/issues/433
if (readLimit == 0) {
return;
}
//获取本次需要读取的自己数量,maxChunkSize默认是8192字节
//debug发现,body数据量比较大的时候,channel一次read到8192*2字节的数据,
// 为了方便分析,这里假设buffer里有8192*2字节,toRead=maxChunkSize=8192
//所以这里一次读取不完
int toRead = Math.min(readLimit, maxChunkSize);
if (toRead > chunkSize) {
toRead = (int) chunkSize;
}
//目前buffer的ridx=0,widx=16384
//这里是获取一个独立的切片副本,即获取到的content是一个ridx=0,widx=8192的buffer
ByteBuf content = buffer.readRetainedSlice(toRead);
//buffer的ridx变为8192
//chunkSize减掉8192
chunkSize -= toRead;
if (chunkSize == 0) {
// Read all content.
//如果chunkSize=0 则把LastHttpContent传递出去,
// HttpObjectAggregator在聚合过程中会根据这个对象判断是否结束聚合
out.add(new DefaultLastHttpContent(content, validateHeaders));
resetNow();
} else {
//获取DefaultHttpContent往下传递
//这个时候代码逻辑回到callDecode()方法里继续进行分析
out.add(new DefaultHttpContent(content));
}
return;
}
//当request的header里有Transfer-Encoding: chunked 则会执行到下面的代码,chunck协议的实现不再解析,参见协议说明即可看懂代码
/**
* everything else after this point takes care of reading chunked content. basically, read chunk size,
* read chunk, read and ignore the CRLF and repeat until 0
*/
case READ_CHUNK_SIZE: try {
AppendableCharSequence line = lineParser.parse(buffer);
if (line == null) {
return;
}
int chunkSize = getChunkSize(line.toString());
this.chunkSize = chunkSize;
if (chunkSize == 0) {
currentState = State.READ_CHUNK_FOOTER;
return;
}
currentState = State.READ_CHUNKED_CONTENT;
// fall-through
} catch (Exception e) {
out.add(invalidChunk(buffer, e));
return;
}
case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE;
int toRead = Math.min((int) chunkSize, maxChunkSize);
toRead = Math.min(toRead, buffer.readableBytes());
if (toRead == 0) {
return;
}
HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
chunkSize -= toRead;
out.add(chunk);
if (chunkSize != 0) {
return;
}
currentState = State.READ_CHUNK_DELIMITER;
// fall-through
}
case READ_CHUNK_DELIMITER: {
final int wIdx = buffer.writerIndex();
int rIdx = buffer.readerIndex();
while (wIdx > rIdx) {
byte next = buffer.getByte(rIdx++);
if (next == HttpConstants.LF) {
currentState = State.READ_CHUNK_SIZE;
break;
}
}
buffer.readerIndex(rIdx);
return;
}
case READ_CHUNK_FOOTER: try {
LastHttpContent trailer = readTrailingHeaders(buffer);
if (trailer == null) {
return;
}
out.add(trailer);
resetNow();
return;
} catch (Exception e) {
out.add(invalidChunk(buffer, e));
return;
}
case BAD_MESSAGE: {
// Keep discarding until disconnection.
buffer.skipBytes(buffer.readableBytes());
break;
}
case UPGRADED: {
int readableBytes = buffer.readableBytes();
if (readableBytes > 0) {
// Keep on consuming as otherwise we may trigger an DecoderException,
// other handler will replace this codec with the upgraded protocol codec to
// take the traffic over at some point then.
// See https://github.com/netty/netty/issues/2173
out.add(buffer.readBytes(readableBytes));
}
break;
}
}
}
再单独看下解析header的代码吧
private static class HeaderParser implements ByteProcessor {
private final AppendableCharSequence seq;
private final int maxLength;
private int size;
HeaderParser(AppendableCharSequence seq, int maxLength) {
this.seq = seq;
this.maxLength = maxLength;
}
public AppendableCharSequence parse(ByteBuf buffer) {
final int oldSize = size;
seq.reset();
//查找CRLF结束符的关键代码
int i = buffer.forEachByte(this);
if (i == -1) {
size = oldSize;
return null;
}
buffer.readerIndex(i + 1);
return seq;
}
public void reset() {
size = 0;
}
@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) (value & 0xFF);
//查找关键代码
if (nextByte == HttpConstants.CR) {
return true;
}
if (nextByte == HttpConstants.LF) {
return false;
}
if (++ size > maxLength) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw newException(maxLength);
}
//把字节一个个加入字符串
seq.append(nextByte);
return true;
}
protected TooLongFrameException newException(int maxLength) {
return new TooLongFrameException("HTTP header is larger than " + maxLength + " bytes.");
}
}
每个header的是以CRLF字符为结束标志符号,这里buffer.forEachBytes(this)是核心的代码,forEachBytes(this)可以接受实现了ByteProcessor的类。每次用bytebuf中取出一个byte传入process方法,这里就是通过this传入的HeaderParser类中的process方法。反复迭代直到process方法返回false为止,这里可以看到,遇到CR时虽然返回true但是不将其加入最后传出的字符串(seq),直到遇到LF表示迭代结束,将seq返回。