2018-07-12 本文已影响17人
1. 源码流程
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
readPending = false;
byteBuf = null;
} while (allocHandle.continueReading());
if (close) {
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called or in channelRead(...) method
// * The user called or in channelReadComplete(...) method
// See
if (!readPending && !config.isAutoRead()) {
- ChannelConfig:跟Channel相关的配置参数
- ChannelPipeline:事件传递相关处理
- ByteBufAllocator:ByteBuf分配器
- RecvByteBufAllocator:
3. RecvByteBufAllocator
* Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
* not to waste its space.
public interface RecvByteBufAllocator {
* Creates a new handle. The handle provides the actual operations and keeps the internal information which is
* required for predicting an optimal buffer capacity.
Handle newHandle();
* @deprecated Use {@link ExtendedHandle}.
interface Handle {
* Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
* enough not to waste its space.
ByteBuf allocate(ByteBufAllocator alloc);
* Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
* capacity.
int guess();
* Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
* read loop.
* <p>
* This may be used by {@link #continueReading()} to determine if the read operation should complete.
* </p>
* This is only ever a hint and may be ignored by the implementation.
* @param config The channel configuration which may impact this object's behavior.
void reset(ChannelConfig config);
* Increment the number of messages that have been read for the current read loop.
* @param numMessages The amount to increment by.
void incMessagesRead(int numMessages);
* Set the bytes that have been read for the last read operation.
* This may be used to increment the number of bytes that have been read.
* @param bytes The number of bytes from the previous read operation. This may be negative if an read error
* occurs. If a negative value is seen it is expected to be return on the next call to
* {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
* to this class and is not required to be enforced in {@link #continueReading()}.
void lastBytesRead(int bytes);
* Get the amount of bytes for the previous read operation.
* @return The amount of bytes for the previous read operation.
int lastBytesRead();
* Set how many bytes the read operation will (or did) attempt to read.
* @param bytes How many bytes the read operation will (or did) attempt to read.
void attemptedBytesRead(int bytes);
* Get how many bytes the read operation will (or did) attempt to read.
* @return How many bytes the read operation will (or did) attempt to read.
int attemptedBytesRead();
* Determine if the current read loop should should continue.
* @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
boolean continueReading();
* The read has completed.
void readComplete();
3.1 lastBytesRead方法
比如"Netty rocks!"字节长度为12
3.2 incMessagesRead方法
3.3 continueReading方法
3.4 Handle源码相关源码
- attemptedBytesRead表示希望尝试读取的字节,比如想要读取1024字节
- lastBytesRead表示实际读取到的字节
- totalBytesRead表示累计读到在字节数
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
public boolean get() {
return attemptedBytesRead == lastBytesRead;
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
public final void incMessagesRead(int amt) {
totalMessages += amt;
public final void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
public final int lastBytesRead() {
return lastBytesRead;
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
public void readComplete() {
public int attemptedBytesRead() {
return attemptedBytesRead;
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
- 其调用了doReadBytes方法进行读取
- doReadBytes方法中调用了ByteBuf的writeBytes方法
- writeBytes方法内部调用了ScatteringByteChannel的read方法
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
return writtenBytes;
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
try {
} catch (ClosedChannelException ignored) {
return -1;