Flink 源码之内存管理
Flink 系列博客
Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压
Flink 源码之快照
Flink 源码之FlinkKafkaConsumer
MemorySegment概述
MemorySegment
是Flink管理的内存片段。该类是一个抽象类。它的实现既可以是堆内存,也可以是堆外内存,甚至是两者同时使用。使用MemorySegment
这个类型管理内存,无需知道内存片段是堆内、堆外还是混合,一视同仁。
MemorySegment
有如下两个实现类:
- HeapMemorySegment:负责维护堆内内存。使用一个字节数组来存放内存数据。
- HybridMemorySegment:是一种混合类型内存片段。可以使用堆内内存,可以使用堆外内存,或者是同时使用。
关于MemorySegment优化的一些讨论
Java虚拟机内有一个JIT(Just In Time)编译器。JIT是一种即时编译器,能够将Java字节码编译为机器码,对于提高程序执行速度有很大的帮助。
但是JIT不会把所有Java字节码都编译为机器码。因为,大多数的代码只会运行一次,对这些代码即时编译为机器码再执行效率上不如直接在JVM运行字节码。运行编译过后的机器码比JVM解释执行Java字节码要快,但是把字节码编译为机器码在执行的过程就不见得比JVM解释执行字节码快。还有一个需要考虑的问题就是代码膨胀,Java字节码编译为机器码之后占用存储会扩大10倍,甚至20倍以上,非常浪费存储空间。总的来说只运行一次或频率很低的代码,解释执行要比JIT编译执行更快。
对于一些经常执行的代码,例如循环中调用某个方法,或者是频繁调用某个处理逻辑,JIT编译这些代码就显得非常有必要。这种执行频率很高的代码称为热代码。在Java程序运行过程中,JIT会把这种热代码以method为单位,编译为机器码并保存起来。这样以后每次不必在解释执行字节码,直接调用保存的机器码就可以了。
将字节码编译为机器码有很多中优化方式。其中有一种是内联,即调用方法时,将方法内容部分的代码直接搬到方法调用的位置,可以避免依照指针查找方法代码的位置。这里会有一个问题,Java支持方法的重写,使用多态方式调用的时候,需要查找具体实现类对应逻辑。也就是说,如果一个父类具有多个子类,多个子类被加载到JVM的时候,JIT无法确定方法的具体实现,这样很不利于JIT的优化。
这时候大家会想到,如果只用其中一个子类,那么在程序运行的时候,调用哪个方法是确定的。JIT便可以采用内联和去虚化的方式,优化这些方法的调用。
HybridMemorySegment
这个类的出现正是为了解决上述问题。HybridMemorySegment
一种实现类既能够处理堆内内存又能够处理堆外内存。这样就不存在多种MemorySegment
的子类,从而使JIT的性能优化效果最大化。
MemorySegment中的UNSAFE对象
MemorySegment
里面有一个UNSAFE
对象。Unsafe
是sun.misc
包内的一个类,提供了native方式直接操作内存(分配内存,释放内存,复制内存,CAS操作,读写各种Java原生数据类型的数据)的方法。这些方法的执行效率非常高。但由于这些方法使用了类似C语言指针的方式操作内存,如果使用不当,很可能会造成不可预知的问题。因此这个类叫做Unsafe,意为用户去使用这个类是不安全的。
正常情况Unsafe
是不允许用户使用的。查看一下它的构造函数:
@CallerSensitive
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
Unsafe
使用的是单例模式。创建这个类实例的方法位于static代码块中。getUnsafe
方法仅仅是取回这个实例。
但是这段代码中有一个条件判断,用来检查调用者的classloader是不是SystemDomainLoader,不清楚是什么意思。所以我们查看下VM.isSystemDomainLoader
方法。代码如下所示:
public static boolean isSystemDomainLoader(ClassLoader loader) {
return loader == null;
}
看到这里就明白了,对于Bootstrap classloader加载的类而言,他们的classloader值为null。而Java的rt.jar是Bootstrap classloader加载的。因此我们可以确定,Unsafe
这个类无法在rt.jar之外的class中使用。
那么问题来了,在Flink分配内存中Unsafe是怎么获取到的?我们看一下MemoryUtils
的UNSAFE
变量。该变量通过getUnsafe
方法获取,代码如下所示:
private static sun.misc.Unsafe getUnsafe() {
try {
Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
return (sun.misc.Unsafe) unsafeField.get(null);
} catch (SecurityException e) {
throw new Error("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e);
} catch (NoSuchFieldException e) {
throw new Error("The static handle field in sun.misc.Unsafe was not found.");
} catch (IllegalArgumentException e) {
throw new Error("Bug: Illegal argument reflection access for static field.", e);
} catch (IllegalAccessException e) {
throw new Error("Access to sun.misc.Unsafe is forbidden by the runtime.", e);
} catch (Throwable t) {
throw new Error("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
}
}
这里通过一个取巧的方式:使用反射。Unsafe
这个class内的单例对象theUnsafe
可以通过反射getDeclaredField
直接拿到。然后设置这个field为可以访问(setAccessible(true)
)。这样可以绕过sun的限制,把Unsafe
对象拿到rt.jar之外使用。
HybridMemorySegment
HybridMemorySegment
既可以是堆内内存也可以是堆外内存。
内部有几个重要的成员变量:
-
heapMemory
:字节数组类型,代表堆内内存。该变量从MemorySegment
继承而来。 -
offHeapBuffer
:ByteBuffer类型,代表堆外内存。 -
address
:存放内存的起始地址。如果是堆内内存,值为BYTE_ARRAY_BASE_OFFSET
,即UNSAFE.arrayBaseOffset(byte[].class)
。如果是堆外内存,值为ByteBuffer的起始地址。 -
addressLimit
:堆外内存的结束地址(address + size)
除了操作堆内内存的方法外,还有两个方法:
get(int offset, ByteBuffer target, int numBytes)
put(int offset, ByteBuffer source, int numBytes)
分别用于读取和写入堆外内存。
MemorySegmentFactory
负责创建出符合要求的MemorySegment
。
注意:该工厂类创建出来的MemorySegment
都是HybridMemorySegment
类型,便于JVM使用JIT优化,从而提高性能。
MemorySegmentFactory
提供了如下方法:
-
wrap
:包装字节数组为HybridMemorySegment
。内存位于堆内。由于此Factory返回的都是HybridMemorySegment
类型,后面统一使用MemorySegment
称呼也不会引起混淆。 -
allocateUnpooledSegment
:分配指定字节数的内存,位于堆内。 -
allocateUnpooledOffHeapMemory
:分配指定字节数的内存,位于堆外。返回MemorySegment
包装ByteBuffer.allocateDirect()
方式分配的堆外内存。 -
allocateOffHeapUnsafeMemory
:使用sun.misc.Unsafe
的allocateMemory
方法分配堆外内存。注意,使用这种方式分配的内存不受-XX:MaxDirectMemorySize
这个JVM参数的限制。 -
wrapOffHeapMemory
:包装ByteBuffer
类型堆外内存为MemorySegment
类型。
SpanningRecordSerializer
SpanningRecordSerializer
负责将数据流中的元素序列化。调用copyToBufferBuilder
,将已经序列化的数据复制到buffer中。
SpanningRecordSerializer
有两个重要的方法:
-
serializeRecord
:将一个元素序列化,存入到内部serializationBuffer
中 -
copyToBufferBuilder
:将serializationBuffer
中的数据存入BufferBuilder
中。BufferBuilder
是MemorySegment
的一个封装形式,放在后面分析。
SpanningRecordSerializer
内部持有一个DataOutputSerializer
类型的对象serializationBuffer
。DataOutputSerializer
内部有一个字节数组缓存空间,用来存放当前正在进行序列化操作的数据。SpanningRecordSerializer
构造方法中创建了一个DataOutputSerializer
,初始缓存大小是128字节。
DataOutputSerializer
提供了大量的write
重载方法,用于序列化各种类型的数据。在序列化数据的时候,初始的缓存容量有可能不够使用,此时自动调用resize
方法扩大缓冲区。DataOutputSerializer
还提供了一个pruneBuffer
方法,将扩容过的当前缓冲区还原回初始大小。
NetworkBuffer
NetworkBuffer
是MemorySegment
的包装类。该类提供了大量的写入和获取buffer中原生类型数据的方法。除此之外还提供了缓存的复制和回收等功能。
NetworkBufferPool
NetworkBufferPool
是供网络层使用的,固定大小的缓存池。每个task并非直接从NetworkBufferPool
获取内存,而是使用从NetworkBufferPool
创建出的LocalBufferPool
来分配内存。
NetworkBufferPool
有几个比较重要的成员变量:
-
totalNumberOfMemorySegments
:总MemorySegment数量 -
memorySegmentSize
:每个MemorySegment的大小 -
availableMemorySegments
:一个队列,用来存放可用的MemorySegment -
allBufferPools
:用来存放基于此NetworkBufferPool
创建的LocalBufferPool
-
numTotalRequiredBuffers
:总共需要的buffer数量。统计所有已经分配给LocalBufferPool的buffer数量 -
numberOfSegmentsToRequest
:MemorySegment为批量申请,该变量决定一批次申请的MemorySegment的数量 -
requestSegmentsTimeout
:请求内存的最大等待时间(超时时间)
构造方法
这里我们分析下NetworkBufferPool
的构造方法。
public NetworkBufferPool(
int numberOfSegmentsToAllocate,
int segmentSize,
int numberOfSegmentsToRequest,
Duration requestSegmentsTimeout) {
// 设置总MemorySegment数量
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
// 设置每个MemorySegment的大小
this.memorySegmentSize = segmentSize;
checkArgument(numberOfSegmentsToRequest > 0, "The number of required buffers should be larger than 0.");
// 设置每批请求的内存数量
this.numberOfSegmentsToRequest = numberOfSegmentsToRequest;
Preconditions.checkNotNull(requestSegmentsTimeout);
checkArgument(requestSegmentsTimeout.toMillis() > 0,
"The timeout for requesting exclusive buffers should be positive.");
// 设置请求内存的超时时间
this.requestSegmentsTimeout = requestSegmentsTimeout;
// MemorySegment大小转换为long
final long sizeInLong = (long) segmentSize;
try {
// 创建availableMemorySegments队列
this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {
throw new OutOfMemoryError("Could not allocate buffer queue of length "
+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}
try {
// 循环调用,创建numberOfSegmentsToAllocate个MemorySegment
// 这里分配的是堆外内存
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
}
}
catch (OutOfMemoryError err) {
int allocated = availableMemorySegments.size();
// free some memory
availableMemorySegments.clear();
long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
long allocatedMb = (sizeInLong * allocated) >> 20;
long missingMb = requiredMb - allocatedMb;
throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool " +
"(required (Mb): " + requiredMb +
", allocated (Mb): " + allocatedMb +
", missing (Mb): " + missingMb + "). Cause: " + err.getMessage());
}
// 设置状态为可用
availabilityHelper.resetAvailable();
long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;
LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
}
创建LocalBufferPool
基于NetworkBufferPool
创建LocalBufferPool
有如下两个方法:
@Override
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
return internalCreateBufferPool(numRequiredBuffers, maxUsedBuffers, null);
}
@Override
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, BufferPoolOwner bufferPoolOwner) throws IOException {
return internalCreateBufferPool(numRequiredBuffers, maxUsedBuffers, bufferPoolOwner);
}
从源代码可知,创建LocalBufferPool
需要3个参数:
-
numRequiredBuffers
:必须要有的buffer数量 -
maxUsedBuffers
:使用的buffer数量不能超过这个值 -
bufferPoolOwner
:bufferPool所有者,用于回收内存时通知owner
接下来分析internalCreateBufferPool
方法。
private BufferPool internalCreateBufferPool(
int numRequiredBuffers,
int maxUsedBuffers,
@Nullable BufferPoolOwner bufferPoolOwner) throws IOException {
// It is necessary to use a separate lock from the one used for buffer
// requests to ensure deadlock freedom for failure cases.
synchronized (factoryLock) {
if (isDestroyed) {
throw new IllegalStateException("Network buffer pool has already been destroyed.");
}
// Ensure that the number of required buffers can be satisfied.
// With dynamic memory management this should become obsolete.
// 检查内存分配不能超过NetworkBufferPool总容量
if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
throw new IOException(String.format("Insufficient number of network buffers: " +
"required %d, but only %d available. %s.",
numRequiredBuffers,
totalNumberOfMemorySegments - numTotalRequiredBuffers,
getConfigDescription()));
}
// 更新所有LocalBufferPool占用的总内存数量
this.numTotalRequiredBuffers += numRequiredBuffers;
// We are good to go, create a new buffer pool and redistribute
// non-fixed size buffers.
// 创建一个LocalBufferPool
LocalBufferPool localBufferPool =
new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, bufferPoolOwner);
// 记录新创建的bufferPool到allBufferPools集合
allBufferPools.add(localBufferPool);
try {
// 重新分配每个LocalBufferPool的内存块,接下来分析
redistributeBuffers();
} catch (IOException e) {
try {
destroyBufferPool(localBufferPool);
} catch (IOException inner) {
e.addSuppressed(inner);
}
ExceptionUtils.rethrowIOException(e);
}
return localBufferPool;
}
}
接下来我们分析下比较重要的redistributeBuffers
方法。该方法在保证NetworkBufferPool有多余未被使用内存的前提下,尽量为每个bufferPool分配更多额外的内存。
private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);
// All buffers, which are not among the required ones
// 计算出尚未分配的内存块数量
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
// 如果没有可用的内存
// 即所有的内存都分配给了各个LocalBufferPool
// 需要设置每个bufferPool的大小为每个pool要求的最小内存大小
// 同时归还超出数量的内存
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
for (LocalBufferPool bufferPool : allBufferPools) {
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
}
return;
}
/*
* With buffer pools being potentially limited, let's distribute the available memory
* segments based on the capacity of each buffer pool, i.e. the maximum number of segments
* an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
* it may be less. Based on this and the sum of all these values (totalCapacity), we build
* a ratio that we use to distribute the buffers.
*/
long totalCapacity = 0; // long to avoid int overflow
// 统计所有的bufferPool可以被额外分配的内存数量总和
for (LocalBufferPool bufferPool : allBufferPools) {
// 计算每个bufferPool可以最多超量使用的内存数
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
// 如果最多可超量使用的内存数量比numAvailableMemorySegment还多
// 肯定无法满足这个需求
// 按照numAvailableMemorySegment来统计
totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}
// no capacity to receive additional buffers?
// 如果总容量为0,直接返回
if (totalCapacity == 0) {
return; // necessary to avoid div by zero when nothing to re-distribute
}
// since one of the arguments of 'min(a,b)' is a positive int, this is actually
// guaranteed to be within the 'int' domain
// (we use a checked downCast to handle possible bugs more gracefully).
// 计算可以重分配的内存数量
// MathUtils.checkedDownCast用来确保结果转换成int不会溢出
final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
Math.min(numAvailableMemorySegment, totalCapacity));
long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
// 对每个LocalBufferPool重新设置pool size
for (LocalBufferPool bufferPool : allBufferPools) {
// 计算pool的最大可超量使用的内存数
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
// shortcut
// 如果可超量分配的内存数量为0,不用进行后续操作
if (excessMax == 0) {
continue;
}
// 和totalCapacity统计相同
totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
// avoid remaining buffers by looking at the total capacity that should have been
// re-distributed up until here
// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
// 计算每个pool可重分配(增加)的内存数量
// totalPartsUsed / totalCapacity可以计算出已分配的内存占据总可用内存的比例
// 因为增加totalPartsUsed在实际分配内存之前,所以这个比例包含了即将分配内存的这个LocalBufferPool的占比
// memorySegmentsToDistribute * totalPartsUsed / totalCapacity可计算出已分配的内存数量(包含即将分配buffer的这个LocalBufferPool)
// 这个结果再减去numDistributedMemorySegment(已分配内存数量),最终得到需要分配给此bufferPool的内存数量
final int mySize = MathUtils.checkedDownCast(
memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
// 更新已分配内存数量的统计
numDistributedMemorySegment += mySize;
// 重新设置bufferPool的大小为新的值
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
assert (totalPartsUsed == totalCapacity);
assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}
请求内存
NetworkBufferPool
有两个请求内存的方法,分别为请求单个MemorySegment和批量请求MemorySegment。
请求单个MemorySegment的方法为requestMemorySegment
,代码如下:
@Nullable
public MemorySegment requestMemorySegment() {
synchronized (availableMemorySegments) {
// 调用internalRequestMemorySegment
return internalRequestMemorySegment();
}
}
@Nullable
private MemorySegment internalRequestMemorySegment() {
assert Thread.holdsLock(availableMemorySegments);
// 从availableMemorySegments获取一个内存片段
final MemorySegment segment = availableMemorySegments.poll();
// 如果获取这个内存成功(不为null)之后availableMemorySegments变为empty,设置状态为不可用
if (availableMemorySegments.isEmpty() && segment != null) {
availabilityHelper.resetUnavailable();
}
return segment;
}
批量请求内存的方法为requestMemorySegments
。代码和分析如下:
@Override
public List<MemorySegment> requestMemorySegments() throws IOException {
synchronized (factoryLock) {
if (isDestroyed) {
throw new IllegalStateException("Network buffer pool has already been destroyed.");
}
// 尝试重新分配内存
tryRedistributeBuffers();
}
// 创建容纳请求到的内存的容器
final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest);
try {
// 统计请求内存操作耗时
final Deadline deadline = Deadline.fromNow(requestSegmentsTimeout);
while (true) {
if (isDestroyed) {
throw new IllegalStateException("Buffer pool is destroyed.");
}
MemorySegment segment;
synchronized (availableMemorySegments) {
// 调用之前分析过的internalRequestMemorySegment方法
// 从availableMemorySegments中获取一个内存片段
// 如果没有获取成功,等得2000毫秒
if ((segment = internalRequestMemorySegment()) == null) {
availableMemorySegments.wait(2000);
}
}
// 如果获取成功,加入内存到segments容器
if (segment != null) {
segments.add(segment);
}
// 如果请求到的内存数量超过了单批次请求的数量限制,退出循环
if (segments.size() >= numberOfSegmentsToRequest) {
break;
}
// 如果申请内存操作超时,抛出异常
if (!deadline.hasTimeLeft()) {
throw new IOException(String.format("Timeout triggered when requesting exclusive buffers: %s, " +
" or you may increase the timeout which is %dms by setting the key '%s'.",
getConfigDescription(),
requestSegmentsTimeout.toMillis(),
NettyShuffleEnvironmentOptions.NETWORK_EXCLUSIVE_BUFFERS_REQUEST_TIMEOUT_MILLISECONDS.key()));
}
}
} catch (Throwable e) {
try {
recycleMemorySegments(segments, numberOfSegmentsToRequest);
} catch (IOException inner) {
e.addSuppressed(inner);
}
ExceptionUtils.rethrowIOException(e);
}
return segments;
}
回收内存
NetworkBufferPool
有回收单个和批量回收MemorySegment
两种方式。
回收单个MemorySegment
的方法为recycle
。
public void recycle(MemorySegment segment) {
// Adds the segment back to the queue, which does not immediately free the memory
// however, since this happens when references to the global pool are also released,
// making the availableMemorySegments queue and its contained object reclaimable
internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment)));
}
这里调用了internalRecycleMemorySegments
,后续分析。
批量回收内存的方法为recycleMemorySegments
。
@Override
public void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException {
recycleMemorySegments(segments, segments.size());
}
private void recycleMemorySegments(Collection<MemorySegment> segments, int size) throws IOException {
internalRecycleMemorySegments(segments);
synchronized (factoryLock) {
numTotalRequiredBuffers -= size;
// note: if this fails, we're fine for the buffer pool since we already recycled the segments
// 内存回收成功之后,需要重新分配内存
redistributeBuffers();
}
}
最后我们分析下internalRecycleMemorySegments
方法。代码如下:
private void internalRecycleMemorySegments(Collection<MemorySegment> segments) {
CompletableFuture<?> toNotify = null;
synchronized (availableMemorySegments) {
// 如果可以成功归还内存,需要设置availabilityHelper为可用状态
if (availableMemorySegments.isEmpty() && !segments.isEmpty()) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
}
// 加入归还的内存片段到availableMemorySegments
availableMemorySegments.addAll(segments);
// 通知所有等待调用的对象
availableMemorySegments.notifyAll();
}
if (toNotify != null) {
toNotify.complete(null);
}
}
LocalBufferPool
LocalBufferPool
是NetworkBufferPool
的包装。负责分配和回收NetworkBufferPool
中的一部分buffer对象。NetworkBufferPool
是一个固定大小的缓存池。将一个NetworkBufferPool
的可用缓存划分给多个LocalBufferPool
使用,避免网络层同时操作NetworkBufferPool
造成死锁。同时LocalBufferPool
实现了默认的回收机制,确保每一个buffer最终会返回给NetworkBufferPool
。
LocalBufferPool
的大小是不固定的。它具有三个表示pool容量的变量:
-
currentPoolSize
:当前pool的容量,可以被NetworkBufferPool
的重分配内存方法修改 -
maxNumberOfMemorySegments
:内存片段数量的上限值 -
numberOfRequiredMemorySegments
:最小内存片段数量
buffer申请方法
向LocalBufferPool
申请buffer的方法有两个:
- requestBuffer 获取一个buffer
- requestBufferBuilderBlocking 获取一个bufferBuilder。BufferBuilder是buffer的一个封装形式,在后面发送端buffer管理会详细介绍
首先我们分析requestBuffer
方法。
@Override
public Buffer requestBuffer() throws IOException {
return toBuffer(requestMemorySegment());
}
toBuffer方法将MemorySegment封装为NetworkBuffer
形式,代码如下:
private Buffer toBuffer(MemorySegment memorySegment) {
if (memorySegment == null) {
return null;
}
// 设定buffer的回收器(recycle)为localBufferPool自己
return new NetworkBuffer(memorySegment, this);
}
接下来查看requestMemorySegment
方法:
@Nullable
private MemorySegment requestMemorySegment() throws IOException {
MemorySegment segment = null;
synchronized (availableMemorySegments) {
// 如果已请求的内存片段数量超过了localBufferPool的大小
// 将多出来的内存片段取出
// 归还给NetworkBufferPool并回收
returnExcessMemorySegments();
// 如果可用内存片段队列为空,直接从NetworkBufferPool请求内存
if (availableMemorySegments.isEmpty()) {
segment = requestMemorySegmentFromGlobal();
}
// segment may have been released by buffer pool owner
// 否则,从可用内存片段中取出一段内存
if (segment == null) {
segment = availableMemorySegments.poll();
}
// 如果仍没有可用内存返回,设置当前状态为不可用
if (segment == null) {
availabilityHelper.resetUnavailable();
}
}
return segment;
}
requestMemorySegment
方法内一些调用需要详细分析。
returnExcessMemorySegments
方法的代码如下:
private void returnExcessMemorySegments() {
assert Thread.holdsLock(availableMemorySegments);
// 如果已请求的内存数量超过了bufferPool的大小,一直循环
while (numberOfRequestedMemorySegments > currentPoolSize) {
// 从可用内存队列中取出一个内存片段
MemorySegment segment = availableMemorySegments.poll();
if (segment == null) {
return;
}
// 归还这个内存
returnMemorySegment(segment);
}
}
// 归还内存的逻辑
private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);
// 已请求内存数量的技术器减1
numberOfRequestedMemorySegments--;
// 调用NetworkBufferPool的recycle方法,回收这一段内存
networkBufferPool.recycle(segment);
}
requestMemorySegmentFromGlobal
代码:
@Nullable
private MemorySegment requestMemorySegmentFromGlobal() throws IOException {
assert Thread.holdsLock(availableMemorySegments);
if (isDestroyed) {
throw new IllegalStateException("Buffer pool is destroyed.");
}
// 只有已请求内存数量小于localBufferPool大小的时候才去请求NetworkBufferPool的内存
if (numberOfRequestedMemorySegments < currentPoolSize) {
final MemorySegment segment = networkBufferPool.requestMemorySegment();
if (segment != null) {
// 如果请求到内存,增加已请求内存数量计数器,并返回内存
numberOfRequestedMemorySegments++;
return segment;
}
}
// 调用bufferPool持有者的释放内存方法
if (bufferPoolOwner != null) {
bufferPoolOwner.releaseMemory(1);
}
return null;
}
另外一个申请内存的方法为requestBufferBuilderBlocking
,代码如下:
@Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
return toBufferBuilder(requestMemorySegmentBlocking());
}
其中toBufferBuilder
把MemorySegment封装为BufferBuilder
类型,设置内存回收器为LocalBufferPool
自身。这里就不再看他的源码。
requestMemorySegmentBlocking
方法的源码需要分析下,如下所示:
private MemorySegment requestMemorySegmentBlocking() throws InterruptedException, IOException {
MemorySegment segment;
while ((segment = requestMemorySegment()) == null) {
try {
// wait until available
getAvailableFuture().get();
} catch (ExecutionException e) {
LOG.error("The available future is completed exceptionally.", e);
ExceptionUtils.rethrow(e);
}
}
return segment;
}
该方法的逻辑很明确,先通过之前分析的requestMemorySegment
获取一段内存,如果没有获取到,则调用availabilityHelper
的get
方法阻塞,直到获取内存成功。有内存被回收的时候,get
方法会返回。
buffer回收方法
LocalBufferPool
回收内存的方法是recycle
。
public void recycle(MemorySegment segment) {
BufferListener listener;
CompletableFuture<?> toNotify = null;
// 创建一个默认的NotificationResult,为缓存未使用
NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
// 一直循环,确保如果listener返回内存不再使用的时候,再次执行这段逻辑将其回收
while (!notificationResult.isBufferUsed()) {
synchronized (availableMemorySegments) {
// 如果已请求的内存片段数量多于当前pool的大小,需要将内存归还
// currentPoolSize可能会在运行的过程中调整(NetworkBufferPool的redistributeBuffers方法)
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
// 返还内存给networkBufferPool处理
// numberOfRequestedMemorySegments 减1
// 调用networkBufferPool的recycle方法
returnMemorySegment(segment);
return;
} else {
// 取出一个注册的listener
listener = registeredListeners.poll();
// 如果没有listener
if (listener == null) {
boolean wasUnavailable = availableMemorySegments.isEmpty();
// 将该内存片段放入可用内存片段列表
availableMemorySegments.add(segment);
if (wasUnavailable) {
// 如果回收内存后,availableMemorySegments从空变为非空,设置toNotify为AVAILABLE
toNotify = availabilityHelper.getUnavailableToResetAvailable();
}
break;
}
}
}
// 如果获取到了listener,调用listener的notifyBufferAvailable方法,告诉listener buffer可用
notificationResult = fireBufferAvailableNotification(listener, segment);
}
mayNotifyAvailable(toNotify);
}
注意:recycle
方法调用registeredListeners.poll()
获取listener。Listener的作用是当availableMemorySegments耗尽的时候,在bufferPool注册listener。当回收了内存,availableMemorySegments
不再为空的时候,bufferPool调用监听器的notifyBufferAvailable
方法,告知现在有buffer可用。只有在availableMemorySegments
(可用内存段队列)为空并且没有被destroy的时候才能添加listener。addBufferListener
方法代码:
@Override
public boolean addBufferListener(BufferListener listener) {
synchronized (availableMemorySegments) {
if (!availableMemorySegments.isEmpty() || isDestroyed) {
return false;
}
registeredListeners.add(listener);
return true;
}
}
换一种说法,只有当buffer使用者发现bufferPool中无buffer可用的时候,才能去注册listener,用于在有buffer可用的时候收到通知。监听器必须实现BufferListener
接口。
BufferListener
接口有两个方法:notifyBufferAvailable
和notifyBufferDestroyed
。前者用来告知有新的buffer可用(可用buffer数量从无到有的时候调用),后者用于告知bufferPool已被销毁。
让我们回到recycle
方法。还剩下一个触发buffer可用时候通知的调用,即fireBufferAvailableNotification
方法。它的代码如下:
private NotificationResult fireBufferAvailableNotification(BufferListener listener, MemorySegment segment) {
// We do not know which locks have been acquired before the recycle() or are needed in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
// 调用listener的notifyBufferAvailable方法
NotificationResult notificationResult = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
// 如果listener需要更多的buffer
if (notificationResult.needsMoreBuffers()) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have been done if we only had one synchronized block
listener.notifyBufferDestroyed();
} else {
// 再次把它注册为listener
registeredListeners.add(listener);
}
}
}
return notificationResult;
}
重设LocalBufferPool的currentPoolSize
NetworkBufferPool
的redistributeBuffers
方法可以重设每个LocalBufferPool
的大小。这是通过调用LocalBufferPool
的setNumBuffers
方法实现。
该方法的代码如下所示:
@Override
public void setNumBuffers(int numBuffers) throws IOException {
int numExcessBuffers;
CompletableFuture<?> toNotify = null;
synchronized (availableMemorySegments) {
// 检查重新设定的pool size必须要大于或等于numberOfRequiredMemorySegments
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);
// 设置currentPoolSize,不大于maxNumberOfMemorySegments
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
} else {
currentPoolSize = numBuffers;
}
// 归还超出部分的内存给NetworkBufferPool
returnExcessMemorySegments();
numExcessBuffers = numberOfRequestedMemorySegments - currentPoolSize;
if (numExcessBuffers < 0 && availableMemorySegments.isEmpty() && networkBufferPool.isAvailable()) {
toNotify = availabilityHelper.getUnavailableToResetUnavailable();
}
}
mayNotifyAvailable(toNotify);
// If there is a registered owner and we have still requested more buffers than our
// size, trigger a recycle via the owner.
if (bufferPoolOwner != null && numExcessBuffers > 0) {
bufferPoolOwner.releaseMemory(numExcessBuffers);
}
}
数据接收端的buffer管理
数据接收端主要是指RemoteInputChannel
。该channel用来接受和处理从其他节点读取到的数据。和RemoteInputChanel
对应的是LocalInputChannel
,负责读取本地的subpartition,不需要使用接收端缓存。
Buffer的请求
RemoteInputChannel
储存从上游接收到的数据使用的buffer从AvailableBufferQueue
中请求。CreditBasedPartitionRequestClientHandler
的decodeBufferOrEvent
方法负责处理netty通信接收到的上游数据。调用inputChannel
的requestBuffer
,获取一个buffer。如果能够获取到buffer,会接着调用inputChannel
的onBuffer
方法,把接收到的数据存入到receivedBuffers
中。
decodeBufferOrEvent
相关代码如下所示:
// 从inputChannel请求缓存
Buffer buffer = inputChannel.requestBuffer();
if (buffer != null) {
// 从netty缓存中读取数据到inputChannel的缓存
nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
buffer.setCompressed(bufferOrEvent.isCompressed);
// 通知inputChannel缓存数据已准备就绪
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else if (inputChannel.isReleased()) {
cancelRequestFor(bufferOrEvent.receiverId);
} else {
throw new IllegalStateException("No buffer available in credit-based input channel.");
}
我们看一下requestBuffer
方法。该方法从bufferQueue
中拿到一个buffer并返回。其中bufferQueue
对象为AvailableBufferQueue
类型。AvailableBufferQueue
负责维护RemoteInputChannel
的可用内存,稍后分析。
@Nullable
public Buffer requestBuffer() {
synchronized (bufferQueue) {
return bufferQueue.takeBuffer();
}
}
最后在onBuffer
方法中,把已经填充了数据的内存块放入receivedBuffers
队列(调用receivedBuffers.add(buffer)
方法)。后续的operator可以通过StreamTaskNetworkInput
读取InputChannel
中缓存的数据。
Buffer的回收
回收buffer的调用位置在StreamTaskNetworkInput
的emitNext
方法。
该方法负责从InputGate中拉取数据,数据被反序列化器反序列化之后发往operator。
负责操作内存的关键部分内容如下所示:
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
// 关键点
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
// ...
}
}
我们分析下关键部分的代码。如果buffer中的数据已经被反序列化完毕(result.isBufferConsumed()
返回true),调用反序列化器中内存块的recycleBuffer
方法。在这里currentRecordDeserializer.getCurrentBuffer()
是NetworkBuffer
类型。
继续跟踪NetworkBuffer
类的recycle
方法。代码如下:
@Override
public void recycleBuffer() {
release();
}
跟随release方法。我们查看下AbstractReferenceCountedByteBuf
中的相关代码,如下所示:
public boolean release() {
return this.handleRelease(updater.release(this));
}
private boolean handleRelease(boolean result) {
if (result) {
this.deallocate();
}
return result;
}
这里有调用了实现类(即NetworkBuffer
)的deallocate
方法。代码如下:
@Override
protected void deallocate() {
recycler.recycle(memorySegment);
}
最终我们找到了调用入口,即recycler
的recycle
方法。
NetworkBuffer
是一个MemorySegment
的封装,创建的时候需要指定他们的回收器recycler
。Recycler需要实现BufferRecycler
接口,该接口仅有一个方法recycle
,存放了MemorySegment
的回收逻辑。
此处NetworkBuffer
解耦了内存回收逻辑的调用。不同类型的buffer回收逻辑是不同的(后面介绍两种类型的buffer)。通过在创建NetworkBuffer
的时候,指定它们对应的recycler
,上层使用的这些缓存的时候无需再知道具体的缓存类型和回收方式。
稍后具体分析两种类型的buffer,以及它们的创建和回收方法。
还有一个释放RemoteInputChannel
所有缓存的地方:AvailableBufferQueue
的releaseAll
方法。该方法稍后介绍。
AvailableBufferQueue
AvailableBufferQueue
负责维护RemoteInputChannel
的可用buffer。它的内部维护了两个内存队列,分别为floatingBuffers
(浮动buffer)和exclusiveBuffers
(专用buffer)。代码如下所示:
/** The current available floating buffers from the fixed buffer pool. */
private final ArrayDeque<Buffer> floatingBuffers;
/** The current available exclusive buffers from the global buffer pool. */
private final ArrayDeque<Buffer> exclusiveBuffers;
其中,专属buffer是在创建InputChannel
的时候分配的(assignExclusiveSegments
方法,间接调用了NetworkBufferPool
的requestMemorySegments
)。专属buffer的大小是固定的,归RemoteInputChannel
独享。如果出现专属buffer不够用的情况,会申请浮动buffer。浮动buffer在InputChannel
所属的bufferPool
中申请。所有属于同一个inputGate
的inputChannel
共享这一个bufferPool
。总结来说浮动buffer可以理解为是按需分配,从公有的bufferPool
中"借用"的。
获取可用内存
请求内存的逻辑位于takeBuffer
方法,如下面所示:
@Nullable
Buffer takeBuffer() {
if (floatingBuffers.size() > 0) {
return floatingBuffers.poll();
} else {
return exclusiveBuffers.poll();
}
}
从中不难得出,如果有可用的浮动内存,会优先使用他们。没有可用浮动内存的时候会使用专属内存。
添加专属buffer的逻辑
为队列增加专属buffer的方法为addExclusiveBuffer
,代码如下所示:
int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
Buffer floatingBuffer = floatingBuffers.poll();
floatingBuffer.recycleBuffer();
return 0;
} else {
return 1;
}
}
这段代码主要逻辑为:
- 添加buffer到专属buffer队列。
- 如果可用buffer数(浮动buffer+专属buffer)大于所需内存buffer数,回收一个浮动buffer。
- 如果回收了浮动内存,返回0,否则返回1。
分配专属buffer的调用时机
在InputGate的setup阶段为所有的input channel分配专属内存。查看SingleInputGate
的setup
方法,代码如下:
@Override
public void setup() throws IOException, InterruptedException {
checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
// assign exclusive buffers to input channels directly and use the rest for floating buffers
// 为所有的InputChannel分配专用buffer,剩下的作为浮动buffer
assignExclusiveSegments();
// 设置bufferPool,用于分配浮动buffer
BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);
// 请求各个input channel需要读取的subpartition
requestPartitions();
}
继续跟踪assignExclusiveSegments
方法调用,代码如下:
@VisibleForTesting
public void assignExclusiveSegments() throws IOException {
synchronized (requestLock) {
for (InputChannel inputChannel : inputChannels.values()) {
if (inputChannel instanceof RemoteInputChannel) {
((RemoteInputChannel) inputChannel).assignExclusiveSegments();
}
}
}
}
这里逻辑就很清晰了,分别调用SingleInputGate
中每个InputChannel
的assignExclusiveSegments
方法。
assignExclusiveSegments
方法作用是为RemoteinputChannel
分配buffer。
该方法的代码如下:
void assignExclusiveSegments() throws IOException {
checkState(initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have " +
"already been set for this input channel.");
// 使用memorySegmentProvider分配内存,它的类型是NetworkBufferPool
Collection<MemorySegment> segments = checkNotNull(memorySegmentProvider.requestMemorySegments());
checkArgument(!segments.isEmpty(), "The number of exclusive buffers per channel should be larger than 0.");
// 初始化credit,credit用于反压,告诉上游我还有多少个可用buffer
initialCredit = segments.size();
numRequiredBuffers = segments.size();
// 将获取到的内存片段加入到`bufferQueue`
synchronized (bufferQueue) {
for (MemorySegment segment : segments) {
// 此处使用NetworkBuffer封装了memorySegment。因此该内存块回收的时候会调用RemoteInputChannel的recycle方法
bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
}
}
}
回收专有内存
专有内存的回收逻辑位于recycle
方法:
@Override
public void recycle(MemorySegment segment) {
int numAddedBuffers;
synchronized (bufferQueue) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers (see below for details).
// 如果channel已经release(调用了releaseAllResources之后为true)
if (isReleased.get()) {
try {
//使用memorySegmentProvider回收内存
memorySegmentProvider.recycleMemorySegments(Collections.singletonList(segment));
return;
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
// 添加内存片段到专属内存
numAddedBuffers = bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
}
// 如果增加的缓存大于0,并且增加缓存前unannouncedCredit为0的话,告诉上游可以接收数据了
if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(numAddedBuffers) == 0) {
notifyCreditAvailable();
}
}
分配浮动buffer
RemoteInputChannel
的onSenderBacklog
方法通过根据backlog(积压的数量)提前分配内存,如果backlog加上初始的credit大于可用buffer数,需要分配浮动buffer。它的代码如下:
void onSenderBacklog(int backlog) throws IOException {
int numRequestedBuffers = 0;
synchronized (bufferQueue) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers (see above for details).
if (isReleased.get()) {
return;
}
numRequiredBuffers = backlog + initialCredit;
while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
Buffer buffer = inputGate.getBufferPool().requestBuffer();
if (buffer != null) {
bufferQueue.addFloatingBuffer(buffer);
numRequestedBuffers++;
} else if (inputGate.getBufferProvider().addBufferListener(this)) {
// If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
isWaitingForFloatingBuffers = true;
break;
}
}
}
if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
notifyCreditAvailable();
}
}
这个方法在Flink 源码之节点间通信已经分析过了。我们在这里重点关注while
循环内的逻辑。
while
循环不断的从inputGate
的bufferPool
(即LocalBufferPool
)获取可用的缓存块,加入到浮动缓存队列中,同时记录请求的缓存块数量,直到可用缓存数量不再小于需要缓存块的数量时候停止循环。
回收浮动buffer
浮动buffer被回收操作完毕后,会被放回到inputChannel
的浮动buffer队列中。浮动buffer的回收逻辑位于LocalBufferPool
的recycle
方法。该方法调用RemoteInputChannel
的notifyBufferAvailable
方法,将回收后的buffer重新加入浮动buffer队列。该recycle
方法在之前已经分析过,此处不再赘述。
接下来问题是,为什么浮动buffer的回收使用的是LocalBufferPool
的recycle
方法。我们看一下上面onSenderBacklog
中分配内存的代码inputGate.getBufferPool().requestBuffer()
方法:
@Override
public Buffer requestBuffer() throws IOException {
return toBuffer(requestMemorySegment());
}
跟踪toBuffer
方法,如下所示:
private Buffer toBuffer(MemorySegment memorySegment) {
if (memorySegment == null) {
return null;
}
return new NetworkBuffer(memorySegment, this);
}
这里使用NetworkBuffer
封装了memorySegment
。第二个参数的含义为回收器(recycler),传入的是LocalBufferPool
自己。因此,浮动内存的回收调用的是LocalBufferPool
的recycle
方法。
releaseAll方法
releaseAll
方法会回收并释放所有的浮动buffer和专属buffer。代码如下:
void releaseAll(List<MemorySegment> exclusiveSegments) {
Buffer buffer;
while ((buffer = floatingBuffers.poll()) != null) {
buffer.recycleBuffer();
}
while ((buffer = exclusiveBuffers.poll()) != null) {
exclusiveSegments.add(buffer.getMemorySegment());
}
}
该方法首先回收所有的浮动buffer,再回收所有专用buffer。清空内部exclusiveBuffers
队列,将已经回收的专用buffer添加到方法参数的exclusiveSegments
队列中。
发送端的buffer管理
数据发送端最重要的角色是RecordWriter
和ResultPartition
。RecordWriter
负责将数据流中的元素序列化,存入到ResultSubpartition
中。一个数据处理流程(Task
)对应着一个ResultPartition
,ResultPartition
的数据需要发送数据到多个下游Channel(对应着下游多个Task),ResultPartition
中的数据专门为下游不同的接收者做了分组,这个分组叫做ResultSubpartition
。
RecordWriter
具有两个子类:BroadcastRecordWriter
和ChannelSelectorRecordWriter
,分别负责广播形式写入数据和根据channel选择器向指定的channel写入数据。这两个子类的emit
方法主要逻辑都位于父类RecordWriter
的emit
方法。
写入数据到发送端缓存
RecordWriter
的emit
方法代码如下:
protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
checkErroneous();
// 设置序列化器要操作的record
serializer.serializeRecord(record);
// Make sure we don't hold onto the large intermediate serialization buffer for too long
// 序列化record,存入到目标channel的缓存中
if (copyFromSerializerToTargetChannel(targetChannel)) {
// 此时代表数据已写入完毕同时正在写入的buffer已经写满,需要执行裁剪serializer操作
serializer.prune();
}
}
我们再分析下copyFromSerializerToTargetChannel
方法:
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
// We should reset the initial position of the intermediate serialization buffer before
// copying, so the serialization results can be copied to multiple target buffers.
serializer.reset();
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
// 这里很重要,当一个buffer被写满的时候需要标记该buffer的状态为finished
// 在ResultSubpartition取出缓存数据的时候会对buffer队列中每个buffer的finished状态进行检查
// 只允许buffer队列中最后一个buffer的状态为没有finished
finishBufferBuilder(bufferBuilder);
// If this was a full record, we are done. Not breaking out of the loop at this point
// will lead to another buffer request before breaking out (that would not be a
// problem per se, but it can lead to stalls in the pipeline).
if (result.isFullRecord()) {
pruneTriggered = true;
emptyCurrentBufferBuilder(targetChannel);
break;
}
bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.copyToBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
flushTargetPartition(targetChannel);
}
return pruneTriggered;
}
该方法的主要逻辑如下:
- 获取一个BufferBuilder
- 写入数据流元素到bufferBuilder
- 如果buffer被写满,修改buffer状态为finished,申请新的buffer继续写入
- buffer写满同时元素也被完全写入,需要跳出循环,并且执行
emptyCurrentBufferBuilder(targetChannel)
,清除writer中目标channel和BufferBuilder的对应关系。下次使用会申请新的BufferBuilder,同时返回true,提示序列化器需要prune
为何要执行serializer的prune操作?对于serializer而言,他的内部维护了一个buffer,buffer的大小随着数据的写入会逐渐扩容,当buffer的数据完全写入到bufferBuilder的时候,serializer无需再维护扩容之后的buffer,因此调用prune方法。将serializer中的buffer还原为初始的buffer大小,减少内存的占用。
接下来分析下内存是如何获取的。我们查看下getBufferBuilder
方法。我们找一个有代表性的常用的子类ChannelSelectorRecordWriter
中的getBufferBuilder
方法,代码如下所示:
@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
// ChannelSelector内维护了一个bufferBuilders数组,使用targetChannel作为下标,获取和它对应的bufferBuilder
if (bufferBuilders[targetChannel] != null) {
return bufferBuilders[targetChannel];
} else {
// 如果该channel对应的bufferBuilder不存在,则创建一个bufferBuilder
return requestNewBufferBuilder(targetChannel);
}
}
@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());
// 从targetPartition获取bufferBuilder
// targetPartition是一个ResultPartitionWriter类型
// 实现类为RecordPartition
BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
// 创建一个BufferConsumer
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
// 已目标channel为下标,存入bufferBuilders数组并返回
bufferBuilders[targetChannel] = bufferBuilder;
return bufferBuilder;
}
经过上面的分析我们可以看出来RecordWriter
从ResultPartition
获取一个个BufferBuilder
,将元素序列化写入BufferBuilder
之后,再根据BufferBuilder
创建出一个BufferConsumer
,再加入到targetPartition中(实际上在ResultSubpartition
)。
那么BufferBuilder
和BufferConsumer
分别是负责什么的呢?BufferBuilder
是一个辅助类,用于将java.nio.ByteBuffer
的内容写入到Flink的MemorySegment
中(append
方法)。同样,BufferConsumer
也是一个辅助类,用于读取MemorySegment
的数据(build
方法,将数据包装为Buffer
类型返回)。
我们继续分析下requestNewBufferBuilder
方法中的targetPartition.getBufferBuilder()
调用。查看ResultPartition
的getBufferBuilder
方法,代码如下:
@Override
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
checkInProduceState();
return bufferPool.requestBufferBuilderBlocking();
}
这里从缓存池bufferPool
以阻塞方式获取BufferBuilder
。此处的bufferPool
是LocalBufferPool
类型。
接下来一个关键的地方是ResultPartition
的addBufferConsumer
方法。代码如下:
@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
checkNotNull(bufferConsumer);
ResultSubpartition subpartition;
try {
checkInProduceState();
subpartition = subpartitions[subpartitionIndex];
}
catch (Exception ex) {
bufferConsumer.close();
throw ex;
}
return subpartition.add(bufferConsumer);
}
这个方法从subpartitions
数组中以targetChannel
为下标(在这个方法内的形参为subpartitionIndex
),获取对应的subpartition,然后调用subpartition
的add
方法添加bufferConsumer
。
ResultSubpartition
有两个子类:
- BoundedBlockingSubpartition:数据以阻塞的方式传输,数据先写入,然后再消费,支持数据的多次消费。数据可以存放在文件或者是内存中。数据存储在BoundedData中。BoundedData有三个类型的子类,分别为
MemoryMapBoundedData
,FileChannelBoundedData
和FileChannelMemoryMappedBoundedData
。分别对应内存中储存的数据,文件中储存的数据和同时在内存和文件中储存的数据。 - PipelinedSubpartition:数据可以持续的写入和消费。只能将数据储存在内存中,数据可以被消费一次。缓存的数据储存在
BufferConsumers
队列中。数据的消费通过PipelinedSubpartition
内的PipelinedSubpartitionView
进行(通过createReaderView
方法创建)。一旦BufferConsumers
中finish状态的buffer数量为1时(通常是缓存消费完,消费者进入到等待状态之后,subpartition有新的数据写入),会通知readerView
有新的数据可以消费。
对于流处理模式,Flink使用的是PipelinedSubpartition
,接下来我们分析这个子类的相关方法。
PipelinedSubpartition
的add
方法源码如下:
@Override
public boolean add(BufferConsumer bufferConsumer) {
// 间接调用了下面的cufferConsumer,设置finished变量为false,不强制通知readerView
return add(bufferConsumer, false);
}
private boolean add(BufferConsumer bufferConsumer, boolean finish) {
checkNotNull(bufferConsumer);
final boolean notifyDataAvailable;
synchronized (buffers) {
// 如果buffer已经finished的同时也已被释放,关闭这个bufferConsumer并回收内存
if (isFinished || isReleased) {
bufferConsumer.close();
return false;
}
// Add the bufferConsumer and update the stats
// 把bufferConsumer加入到buffers队列中
buffers.add(bufferConsumer);
// 增加总buffer数量计数器
updateStatistics(bufferConsumer);
// 增加待处理的buffer数量计数器
increaseBuffersInBacklog(bufferConsumer);
// 判断是否需要通知readerView去消费数据
// 如果buffers队列中只有一个buffer,并且这个buffer已经finish,返回true
// 如果buffers队列中只有一个buffer,这时又加入一个新的buffer,那么第一个buffer会被认为已经finish,方法返回true
// 其实在这里判断有多少个buffer已经finished的逻辑如下
// 1. 如果只有一个buffer,需要判断下这个buffer是否finish
// 2. 如果有多个buffer,返回buffers的size - 1。默认出最后一个之外,所有的buffer都已经finish
notifyDataAvailable = shouldNotifyDataAvailable() || finish;
isFinished |= finish;
}
if (notifyDataAvailable) {
notifyDataAvailable();
}
return true;
}
到此为止,我们分析了RecordWriter中buffer的获取,数据的写入和准备subpartition供下游消费的过程。接下来需要关注缓存中数据发往下游的过程。
从发送端缓存读取数据
ResultSubpartition
为数据消费端建立一个ResultSubpartitionView
。消费者(这里是netty的数据发送server),通过调用ResultSubpartitionView
的getNextBuffer
获取buffer中的数据。我们仍然以PipelinedSubpartitionView
为例,分析下getNextBuffer
方法。
@Nullable
@Override
public BufferAndBacklog getNextBuffer() {
return parent.pollBuffer();
}
接着查看下PipelinedSubpartition
的pollBuffer
方法。代码和分析如下所示:
@Nullable
BufferAndBacklog pollBuffer() {
synchronized (buffers) {
Buffer buffer = null;
// 如果buffers队列为空,不需要flush
if (buffers.isEmpty()) {
flushRequested = false;
}
while (!buffers.isEmpty()) {
BufferConsumer bufferConsumer = buffers.peek();
// 转换bufferConsumer为Buffer类型
buffer = bufferConsumer.build();
// 检查buffers队列,没有finished的buffer不能在队列头部
// 只允许buffer队列最后一个buffer的状态为没有finish
checkState(bufferConsumer.isFinished() || buffers.size() == 1,
"When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");
// 如果队列只有一个buffer,不需要flush
if (buffers.size() == 1) {
// turn off flushRequested flag if we drained all of the available data
flushRequested = false;
}
// 如果当前bufferConsumer已经finish,回收这个buffer,待处理的buffer数量减1
if (bufferConsumer.isFinished()) {
// 注意,回收已经finished的buffer的调用在此
buffers.pop().close();
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
}
// 如果可读字节数大于0,跳出循环
if (buffer.readableBytes() > 0) {
break;
}
// 否则回收内存
buffer.recycleBuffer();
buffer = null;
if (!bufferConsumer.isFinished()) {
// 运行到这里只有一种可能,buffer队列中只有一个没有finished的buffer,跳出循环
break;
}
}
if (buffer == null) {
return null;
}
// 更新总字节数计数器
updateStatistics(buffer);
// Do not report last remaining buffer on buffers as available to read (assuming it's unfinished).
// It will be reported for reading either on flush or when the number of buffers in the queue
// will be 2 or more.
// 返回BufferAndBacklog对象,该对象包装了包含数据的buffer
// BufferAndBacklog对象包含4个字段,buffer(缓存数据),isMoreAvailable(是否有更多数据),buffersInBacklog(挤压的buffer数量)和nextBufferIsEvent(下一个buffer是否是保存的是事件)
return new BufferAndBacklog(
buffer,
// 如果需要flush,或者finished的buffer数量大于0,返回true
isAvailableUnsafe(),
getBuffersInBacklog(),
nextBufferIsEventUnsafe());
}
}
到这里,发送端缓存的申请,数据写入和读取和回收过程已经分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。