玩转大数据Flink源码解析

Flink 源码之内存管理

2020-02-27  本文已影响0人  AlienPaul

Flink 系列博客

Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压
Flink 源码之快照
Flink 源码之FlinkKafkaConsumer

MemorySegment概述

MemorySegment是Flink管理的内存片段。该类是一个抽象类。它的实现既可以是堆内存,也可以是堆外内存,甚至是两者同时使用。使用MemorySegment这个类型管理内存,无需知道内存片段是堆内、堆外还是混合,一视同仁。

MemorySegment有如下两个实现类:

关于MemorySegment优化的一些讨论

Java虚拟机内有一个JIT(Just In Time)编译器。JIT是一种即时编译器,能够将Java字节码编译为机器码,对于提高程序执行速度有很大的帮助。

但是JIT不会把所有Java字节码都编译为机器码。因为,大多数的代码只会运行一次,对这些代码即时编译为机器码再执行效率上不如直接在JVM运行字节码。运行编译过后的机器码比JVM解释执行Java字节码要快,但是把字节码编译为机器码在执行的过程就不见得比JVM解释执行字节码快。还有一个需要考虑的问题就是代码膨胀,Java字节码编译为机器码之后占用存储会扩大10倍,甚至20倍以上,非常浪费存储空间。总的来说只运行一次或频率很低的代码,解释执行要比JIT编译执行更快。

对于一些经常执行的代码,例如循环中调用某个方法,或者是频繁调用某个处理逻辑,JIT编译这些代码就显得非常有必要。这种执行频率很高的代码称为热代码。在Java程序运行过程中,JIT会把这种热代码以method为单位,编译为机器码并保存起来。这样以后每次不必在解释执行字节码,直接调用保存的机器码就可以了。

将字节码编译为机器码有很多中优化方式。其中有一种是内联,即调用方法时,将方法内容部分的代码直接搬到方法调用的位置,可以避免依照指针查找方法代码的位置。这里会有一个问题,Java支持方法的重写,使用多态方式调用的时候,需要查找具体实现类对应逻辑。也就是说,如果一个父类具有多个子类,多个子类被加载到JVM的时候,JIT无法确定方法的具体实现,这样很不利于JIT的优化。

这时候大家会想到,如果只用其中一个子类,那么在程序运行的时候,调用哪个方法是确定的。JIT便可以采用内联和去虚化的方式,优化这些方法的调用。

HybridMemorySegment这个类的出现正是为了解决上述问题。HybridMemorySegment一种实现类既能够处理堆内内存又能够处理堆外内存。这样就不存在多种MemorySegment的子类,从而使JIT的性能优化效果最大化。

MemorySegment中的UNSAFE对象

MemorySegment里面有一个UNSAFE对象。Unsafesun.misc包内的一个类,提供了native方式直接操作内存(分配内存,释放内存,复制内存,CAS操作,读写各种Java原生数据类型的数据)的方法。这些方法的执行效率非常高。但由于这些方法使用了类似C语言指针的方式操作内存,如果使用不当,很可能会造成不可预知的问题。因此这个类叫做Unsafe,意为用户去使用这个类是不安全的。

正常情况Unsafe是不允许用户使用的。查看一下它的构造函数:

@CallerSensitive
public static Unsafe getUnsafe() {
    Class<?> caller = Reflection.getCallerClass();
    if (!VM.isSystemDomainLoader(caller.getClassLoader())) {
        throw new SecurityException("Unsafe");
    } else {
        return theUnsafe;
    }
}

Unsafe使用的是单例模式。创建这个类实例的方法位于static代码块中。getUnsafe方法仅仅是取回这个实例。
但是这段代码中有一个条件判断,用来检查调用者的classloader是不是SystemDomainLoader,不清楚是什么意思。所以我们查看下VM.isSystemDomainLoader方法。代码如下所示:

public static boolean isSystemDomainLoader(ClassLoader loader) {
    return loader == null;
}

看到这里就明白了,对于Bootstrap classloader加载的类而言,他们的classloader值为null。而Java的rt.jar是Bootstrap classloader加载的。因此我们可以确定,Unsafe这个类无法在rt.jar之外的class中使用。

那么问题来了,在Flink分配内存中Unsafe是怎么获取到的?我们看一下MemoryUtilsUNSAFE变量。该变量通过getUnsafe方法获取,代码如下所示:

private static sun.misc.Unsafe getUnsafe() {
    try {
        Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
        unsafeField.setAccessible(true);
        return (sun.misc.Unsafe) unsafeField.get(null);
    } catch (SecurityException e) {
        throw new Error("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e);
    } catch (NoSuchFieldException e) {
        throw new Error("The static handle field in sun.misc.Unsafe was not found.");
    } catch (IllegalArgumentException e) {
        throw new Error("Bug: Illegal argument reflection access for static field.", e);
    } catch (IllegalAccessException e) {
        throw new Error("Access to sun.misc.Unsafe is forbidden by the runtime.", e);
    } catch (Throwable t) {
        throw new Error("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
    }
}

这里通过一个取巧的方式:使用反射。Unsafe这个class内的单例对象theUnsafe可以通过反射getDeclaredField直接拿到。然后设置这个field为可以访问(setAccessible(true))。这样可以绕过sun的限制,把Unsafe对象拿到rt.jar之外使用。

HybridMemorySegment

HybridMemorySegment既可以是堆内内存也可以是堆外内存。
内部有几个重要的成员变量:

除了操作堆内内存的方法外,还有两个方法:

分别用于读取和写入堆外内存。

MemorySegmentFactory

负责创建出符合要求的MemorySegment
注意:该工厂类创建出来的MemorySegment都是HybridMemorySegment类型,便于JVM使用JIT优化,从而提高性能。

MemorySegmentFactory提供了如下方法:

SpanningRecordSerializer

SpanningRecordSerializer负责将数据流中的元素序列化。调用copyToBufferBuilder,将已经序列化的数据复制到buffer中。
SpanningRecordSerializer有两个重要的方法:

SpanningRecordSerializer内部持有一个DataOutputSerializer类型的对象serializationBufferDataOutputSerializer内部有一个字节数组缓存空间,用来存放当前正在进行序列化操作的数据。SpanningRecordSerializer构造方法中创建了一个DataOutputSerializer,初始缓存大小是128字节。

DataOutputSerializer提供了大量的write重载方法,用于序列化各种类型的数据。在序列化数据的时候,初始的缓存容量有可能不够使用,此时自动调用resize方法扩大缓冲区。DataOutputSerializer还提供了一个pruneBuffer方法,将扩容过的当前缓冲区还原回初始大小。

NetworkBuffer

NetworkBufferMemorySegment的包装类。该类提供了大量的写入和获取buffer中原生类型数据的方法。除此之外还提供了缓存的复制和回收等功能。

NetworkBufferPool

NetworkBufferPool是供网络层使用的,固定大小的缓存池。每个task并非直接从NetworkBufferPool获取内存,而是使用从NetworkBufferPool创建出的LocalBufferPool来分配内存。

NetworkBufferPool有几个比较重要的成员变量:

构造方法

这里我们分析下NetworkBufferPool的构造方法。

public NetworkBufferPool(
        int numberOfSegmentsToAllocate,
        int segmentSize,
        int numberOfSegmentsToRequest,
        Duration requestSegmentsTimeout) {
    // 设置总MemorySegment数量
    this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
    // 设置每个MemorySegment的大小
    this.memorySegmentSize = segmentSize;

    checkArgument(numberOfSegmentsToRequest > 0, "The number of required buffers should be larger than 0.");
    // 设置每批请求的内存数量
    this.numberOfSegmentsToRequest = numberOfSegmentsToRequest;

    Preconditions.checkNotNull(requestSegmentsTimeout);
    checkArgument(requestSegmentsTimeout.toMillis() > 0,
            "The timeout for requesting exclusive buffers should be positive.");
    // 设置请求内存的超时时间
    this.requestSegmentsTimeout = requestSegmentsTimeout;

    // MemorySegment大小转换为long
    final long sizeInLong = (long) segmentSize;

    try {
        // 创建availableMemorySegments队列
        this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate);
    }
    catch (OutOfMemoryError err) {
        throw new OutOfMemoryError("Could not allocate buffer queue of length "
                + numberOfSegmentsToAllocate + " - " + err.getMessage());
    }

    try {
        // 循环调用,创建numberOfSegmentsToAllocate个MemorySegment
        // 这里分配的是堆外内存
        for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
            availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
        }
    }
    catch (OutOfMemoryError err) {
        int allocated = availableMemorySegments.size();

        // free some memory
        availableMemorySegments.clear();

        long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
        long allocatedMb = (sizeInLong * allocated) >> 20;
        long missingMb = requiredMb - allocatedMb;

        throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool " +
                "(required (Mb): " + requiredMb +
                ", allocated (Mb): " + allocatedMb +
                ", missing (Mb): " + missingMb + "). Cause: " + err.getMessage());
    }

    // 设置状态为可用
    availabilityHelper.resetAvailable();

    long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

    LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
            allocatedMb, availableMemorySegments.size(), segmentSize);
}

创建LocalBufferPool

基于NetworkBufferPool创建LocalBufferPool有如下两个方法:

@Override
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
    return internalCreateBufferPool(numRequiredBuffers, maxUsedBuffers, null);
}

@Override
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, BufferPoolOwner bufferPoolOwner) throws IOException {
    return internalCreateBufferPool(numRequiredBuffers, maxUsedBuffers, bufferPoolOwner);
}

从源代码可知,创建LocalBufferPool需要3个参数:

接下来分析internalCreateBufferPool方法。

private BufferPool internalCreateBufferPool(
        int numRequiredBuffers,
        int maxUsedBuffers,
        @Nullable BufferPoolOwner bufferPoolOwner) throws IOException {

    // It is necessary to use a separate lock from the one used for buffer
    // requests to ensure deadlock freedom for failure cases.
    synchronized (factoryLock) {
        if (isDestroyed) {
            throw new IllegalStateException("Network buffer pool has already been destroyed.");
        }

        // Ensure that the number of required buffers can be satisfied.
        // With dynamic memory management this should become obsolete.
        // 检查内存分配不能超过NetworkBufferPool总容量
        if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
            throw new IOException(String.format("Insufficient number of network buffers: " +
                            "required %d, but only %d available. %s.",
                    numRequiredBuffers,
                    totalNumberOfMemorySegments - numTotalRequiredBuffers,
                    getConfigDescription()));
        }

        // 更新所有LocalBufferPool占用的总内存数量
        this.numTotalRequiredBuffers += numRequiredBuffers;

        // We are good to go, create a new buffer pool and redistribute
        // non-fixed size buffers.
        // 创建一个LocalBufferPool
        LocalBufferPool localBufferPool =
            new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, bufferPoolOwner);

        // 记录新创建的bufferPool到allBufferPools集合
        allBufferPools.add(localBufferPool);

        try {
            // 重新分配每个LocalBufferPool的内存块,接下来分析
            redistributeBuffers();
        } catch (IOException e) {
            try {
                destroyBufferPool(localBufferPool);
            } catch (IOException inner) {
                e.addSuppressed(inner);
            }
            ExceptionUtils.rethrowIOException(e);
        }

        return localBufferPool;
    }
}

接下来我们分析下比较重要的redistributeBuffers方法。该方法在保证NetworkBufferPool有多余未被使用内存的前提下,尽量为每个bufferPool分配更多额外的内存。

private void redistributeBuffers() throws IOException {
    assert Thread.holdsLock(factoryLock);

    // All buffers, which are not among the required ones
    // 计算出尚未分配的内存块数量
    final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;

    // 如果没有可用的内存
    // 即所有的内存都分配给了各个LocalBufferPool
    // 需要设置每个bufferPool的大小为每个pool要求的最小内存大小
    // 同时归还超出数量的内存
    if (numAvailableMemorySegment == 0) {
        // in this case, we need to redistribute buffers so that every pool gets its minimum
        for (LocalBufferPool bufferPool : allBufferPools) {
            bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
        }
        return;
    }

    /*
     * With buffer pools being potentially limited, let's distribute the available memory
     * segments based on the capacity of each buffer pool, i.e. the maximum number of segments
     * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
     * it may be less. Based on this and the sum of all these values (totalCapacity), we build
     * a ratio that we use to distribute the buffers.
     */

    long totalCapacity = 0; // long to avoid int overflow

    // 统计所有的bufferPool可以被额外分配的内存数量总和
    for (LocalBufferPool bufferPool : allBufferPools) {
        // 计算每个bufferPool可以最多超量使用的内存数
        int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
            bufferPool.getNumberOfRequiredMemorySegments();
        // 如果最多可超量使用的内存数量比numAvailableMemorySegment还多
        // 肯定无法满足这个需求
        // 按照numAvailableMemorySegment来统计
        totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
    }

    // no capacity to receive additional buffers?
    // 如果总容量为0,直接返回
    if (totalCapacity == 0) {
        return; // necessary to avoid div by zero when nothing to re-distribute
    }

    // since one of the arguments of 'min(a,b)' is a positive int, this is actually
    // guaranteed to be within the 'int' domain
    // (we use a checked downCast to handle possible bugs more gracefully).
    // 计算可以重分配的内存数量
    // MathUtils.checkedDownCast用来确保结果转换成int不会溢出
    final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
            Math.min(numAvailableMemorySegment, totalCapacity));

    long totalPartsUsed = 0; // of totalCapacity
    int numDistributedMemorySegment = 0;
    // 对每个LocalBufferPool重新设置pool size
    for (LocalBufferPool bufferPool : allBufferPools) {
        // 计算pool的最大可超量使用的内存数
        int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
            bufferPool.getNumberOfRequiredMemorySegments();

        // shortcut
        // 如果可超量分配的内存数量为0,不用进行后续操作
        if (excessMax == 0) {
            continue;
        }

        // 和totalCapacity统计相同
        totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);

        // avoid remaining buffers by looking at the total capacity that should have been
        // re-distributed up until here
        // the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
        // 计算每个pool可重分配(增加)的内存数量
        // totalPartsUsed / totalCapacity可以计算出已分配的内存占据总可用内存的比例
        // 因为增加totalPartsUsed在实际分配内存之前,所以这个比例包含了即将分配内存的这个LocalBufferPool的占比
        // memorySegmentsToDistribute * totalPartsUsed / totalCapacity可计算出已分配的内存数量(包含即将分配buffer的这个LocalBufferPool)
        // 这个结果再减去numDistributedMemorySegment(已分配内存数量),最终得到需要分配给此bufferPool的内存数量
        final int mySize = MathUtils.checkedDownCast(
                memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);

        // 更新已分配内存数量的统计
        numDistributedMemorySegment += mySize;
        // 重新设置bufferPool的大小为新的值
        bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
    }

    assert (totalPartsUsed == totalCapacity);
    assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}

请求内存

NetworkBufferPool有两个请求内存的方法,分别为请求单个MemorySegment和批量请求MemorySegment。

请求单个MemorySegment的方法为requestMemorySegment,代码如下:

@Nullable
public MemorySegment requestMemorySegment() {
    synchronized (availableMemorySegments) {
        // 调用internalRequestMemorySegment
        return internalRequestMemorySegment();
    }
}

@Nullable
private MemorySegment internalRequestMemorySegment() {
    assert Thread.holdsLock(availableMemorySegments);

    // 从availableMemorySegments获取一个内存片段
    final MemorySegment segment = availableMemorySegments.poll();
    // 如果获取这个内存成功(不为null)之后availableMemorySegments变为empty,设置状态为不可用
    if (availableMemorySegments.isEmpty() && segment != null) {
        availabilityHelper.resetUnavailable();
    }
    return segment;
}

批量请求内存的方法为requestMemorySegments。代码和分析如下:

@Override
public List<MemorySegment> requestMemorySegments() throws IOException {
    synchronized (factoryLock) {
        if (isDestroyed) {
            throw new IllegalStateException("Network buffer pool has already been destroyed.");
        }
        // 尝试重新分配内存
        tryRedistributeBuffers();
    }

    // 创建容纳请求到的内存的容器
    final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest);
    try {
        // 统计请求内存操作耗时
        final Deadline deadline = Deadline.fromNow(requestSegmentsTimeout);
        while (true) {
            if (isDestroyed) {
                throw new IllegalStateException("Buffer pool is destroyed.");
            }

            MemorySegment segment;
            synchronized (availableMemorySegments) {
                // 调用之前分析过的internalRequestMemorySegment方法
                // 从availableMemorySegments中获取一个内存片段
                // 如果没有获取成功,等得2000毫秒
                if ((segment = internalRequestMemorySegment()) == null) {
                    availableMemorySegments.wait(2000);
                }
            }
            // 如果获取成功,加入内存到segments容器
            if (segment != null) {
                segments.add(segment);
            }

            // 如果请求到的内存数量超过了单批次请求的数量限制,退出循环
            if (segments.size() >= numberOfSegmentsToRequest) {
                break;
            }

            // 如果申请内存操作超时,抛出异常
            if (!deadline.hasTimeLeft()) {
                throw new IOException(String.format("Timeout triggered when requesting exclusive buffers: %s, " +
                                " or you may increase the timeout which is %dms by setting the key '%s'.",
                        getConfigDescription(),
                        requestSegmentsTimeout.toMillis(),
                        NettyShuffleEnvironmentOptions.NETWORK_EXCLUSIVE_BUFFERS_REQUEST_TIMEOUT_MILLISECONDS.key()));
            }
        }
    } catch (Throwable e) {
        try {
            recycleMemorySegments(segments, numberOfSegmentsToRequest);
        } catch (IOException inner) {
            e.addSuppressed(inner);
        }
        ExceptionUtils.rethrowIOException(e);
    }

    return segments;
}

回收内存

NetworkBufferPool有回收单个和批量回收MemorySegment两种方式。
回收单个MemorySegment的方法为recycle

public void recycle(MemorySegment segment) {
    // Adds the segment back to the queue, which does not immediately free the memory
    // however, since this happens when references to the global pool are also released,
    // making the availableMemorySegments queue and its contained object reclaimable
    internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment)));
}

这里调用了internalRecycleMemorySegments,后续分析。

批量回收内存的方法为recycleMemorySegments

@Override
public void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException {
    recycleMemorySegments(segments, segments.size());
}

private void recycleMemorySegments(Collection<MemorySegment> segments, int size) throws IOException {
    internalRecycleMemorySegments(segments);

    synchronized (factoryLock) {
        numTotalRequiredBuffers -= size;

        // note: if this fails, we're fine for the buffer pool since we already recycled the segments
        // 内存回收成功之后,需要重新分配内存
        redistributeBuffers();
    }
}

最后我们分析下internalRecycleMemorySegments方法。代码如下:

private void internalRecycleMemorySegments(Collection<MemorySegment> segments) {
    CompletableFuture<?> toNotify = null;
    synchronized (availableMemorySegments) {
        // 如果可以成功归还内存,需要设置availabilityHelper为可用状态
        if (availableMemorySegments.isEmpty() && !segments.isEmpty()) {
            toNotify = availabilityHelper.getUnavailableToResetAvailable();
        }
        // 加入归还的内存片段到availableMemorySegments
        availableMemorySegments.addAll(segments);
        // 通知所有等待调用的对象
        availableMemorySegments.notifyAll();
    }

    if (toNotify != null) {
        toNotify.complete(null);
    }
}

LocalBufferPool

LocalBufferPoolNetworkBufferPool的包装。负责分配和回收NetworkBufferPool中的一部分buffer对象。NetworkBufferPool是一个固定大小的缓存池。将一个NetworkBufferPool的可用缓存划分给多个LocalBufferPool使用,避免网络层同时操作NetworkBufferPool造成死锁。同时LocalBufferPool实现了默认的回收机制,确保每一个buffer最终会返回给NetworkBufferPool
LocalBufferPool的大小是不固定的。它具有三个表示pool容量的变量:

buffer申请方法

LocalBufferPool申请buffer的方法有两个:

首先我们分析requestBuffer方法。

@Override
public Buffer requestBuffer() throws IOException {
    return toBuffer(requestMemorySegment());
}

toBuffer方法将MemorySegment封装为NetworkBuffer形式,代码如下:

private Buffer toBuffer(MemorySegment memorySegment) {
    if (memorySegment == null) {
        return null;
    }
    // 设定buffer的回收器(recycle)为localBufferPool自己
    return new NetworkBuffer(memorySegment, this);
}

接下来查看requestMemorySegment方法:

@Nullable
private MemorySegment requestMemorySegment() throws IOException {
    MemorySegment segment = null;
    synchronized (availableMemorySegments) {
        // 如果已请求的内存片段数量超过了localBufferPool的大小
        // 将多出来的内存片段取出
        // 归还给NetworkBufferPool并回收
        returnExcessMemorySegments();

        // 如果可用内存片段队列为空,直接从NetworkBufferPool请求内存
        if (availableMemorySegments.isEmpty()) {
            segment = requestMemorySegmentFromGlobal();
        }
        // segment may have been released by buffer pool owner
        // 否则,从可用内存片段中取出一段内存
        if (segment == null) {
            segment = availableMemorySegments.poll();
        }
        // 如果仍没有可用内存返回,设置当前状态为不可用
        if (segment == null) {
            availabilityHelper.resetUnavailable();
        }
    }
    return segment;
}

requestMemorySegment方法内一些调用需要详细分析。

returnExcessMemorySegments方法的代码如下:

private void returnExcessMemorySegments() {
    assert Thread.holdsLock(availableMemorySegments);

    // 如果已请求的内存数量超过了bufferPool的大小,一直循环
    while (numberOfRequestedMemorySegments > currentPoolSize) {
        // 从可用内存队列中取出一个内存片段
        MemorySegment segment = availableMemorySegments.poll();
        if (segment == null) {
            return;
        }

        // 归还这个内存
        returnMemorySegment(segment);
    }
}

// 归还内存的逻辑
private void returnMemorySegment(MemorySegment segment) {
    assert Thread.holdsLock(availableMemorySegments);

    // 已请求内存数量的技术器减1
    numberOfRequestedMemorySegments--;
    // 调用NetworkBufferPool的recycle方法,回收这一段内存
    networkBufferPool.recycle(segment);
}

requestMemorySegmentFromGlobal代码:

@Nullable
private MemorySegment requestMemorySegmentFromGlobal() throws IOException {
    assert Thread.holdsLock(availableMemorySegments);

    if (isDestroyed) {
        throw new IllegalStateException("Buffer pool is destroyed.");
    }

    // 只有已请求内存数量小于localBufferPool大小的时候才去请求NetworkBufferPool的内存
    if (numberOfRequestedMemorySegments < currentPoolSize) {
        final MemorySegment segment = networkBufferPool.requestMemorySegment();
        if (segment != null) {
            // 如果请求到内存,增加已请求内存数量计数器,并返回内存
            numberOfRequestedMemorySegments++;
            return segment;
        }
    }

    // 调用bufferPool持有者的释放内存方法
    if (bufferPoolOwner != null) {
        bufferPoolOwner.releaseMemory(1);
    }

    return null;
}

另外一个申请内存的方法为requestBufferBuilderBlocking,代码如下:

@Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
    return toBufferBuilder(requestMemorySegmentBlocking());
}

其中toBufferBuilder把MemorySegment封装为BufferBuilder类型,设置内存回收器为LocalBufferPool自身。这里就不再看他的源码。

requestMemorySegmentBlocking方法的源码需要分析下,如下所示:

private MemorySegment requestMemorySegmentBlocking() throws InterruptedException, IOException {
    MemorySegment segment;
    while ((segment = requestMemorySegment()) == null) {
        try {
            // wait until available
            getAvailableFuture().get();
        } catch (ExecutionException e) {
            LOG.error("The available future is completed exceptionally.", e);
            ExceptionUtils.rethrow(e);
        }
    }
    return segment;
}

该方法的逻辑很明确,先通过之前分析的requestMemorySegment获取一段内存,如果没有获取到,则调用availabilityHelperget方法阻塞,直到获取内存成功。有内存被回收的时候,get方法会返回。

buffer回收方法

LocalBufferPool回收内存的方法是recycle

public void recycle(MemorySegment segment) {
    BufferListener listener;
    CompletableFuture<?> toNotify = null;
    // 创建一个默认的NotificationResult,为缓存未使用
    NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
    // 一直循环,确保如果listener返回内存不再使用的时候,再次执行这段逻辑将其回收
    while (!notificationResult.isBufferUsed()) {
        synchronized (availableMemorySegments) {
            // 如果已请求的内存片段数量多于当前pool的大小,需要将内存归还
            // currentPoolSize可能会在运行的过程中调整(NetworkBufferPool的redistributeBuffers方法)
            if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
                // 返还内存给networkBufferPool处理
                // numberOfRequestedMemorySegments 减1
                // 调用networkBufferPool的recycle方法
                returnMemorySegment(segment);
                return;
            } else {
                // 取出一个注册的listener
                listener = registeredListeners.poll();
                // 如果没有listener
                if (listener == null) {
                    boolean wasUnavailable = availableMemorySegments.isEmpty();
                    
                    // 将该内存片段放入可用内存片段列表
                    availableMemorySegments.add(segment);
                    if (wasUnavailable) {
                        // 如果回收内存后,availableMemorySegments从空变为非空,设置toNotify为AVAILABLE
                        toNotify = availabilityHelper.getUnavailableToResetAvailable();
                    }
                    break;
                }
            }
        }
        // 如果获取到了listener,调用listener的notifyBufferAvailable方法,告诉listener buffer可用
        notificationResult = fireBufferAvailableNotification(listener, segment);
    }

    mayNotifyAvailable(toNotify);
}

注意:recycle方法调用registeredListeners.poll()获取listener。Listener的作用是当availableMemorySegments耗尽的时候,在bufferPool注册listener。当回收了内存,availableMemorySegments不再为空的时候,bufferPool调用监听器的notifyBufferAvailable方法,告知现在有buffer可用。只有在availableMemorySegments(可用内存段队列)为空并且没有被destroy的时候才能添加listener。addBufferListener方法代码:

@Override
public boolean addBufferListener(BufferListener listener) {
    synchronized (availableMemorySegments) {
        if (!availableMemorySegments.isEmpty() || isDestroyed) {
            return false;
        }

        registeredListeners.add(listener);
        return true;
    }
}

换一种说法,只有当buffer使用者发现bufferPool中无buffer可用的时候,才能去注册listener,用于在有buffer可用的时候收到通知。监听器必须实现BufferListener接口。

BufferListener接口有两个方法:notifyBufferAvailablenotifyBufferDestroyed。前者用来告知有新的buffer可用(可用buffer数量从无到有的时候调用),后者用于告知bufferPool已被销毁。

让我们回到recycle方法。还剩下一个触发buffer可用时候通知的调用,即fireBufferAvailableNotification方法。它的代码如下:

private NotificationResult fireBufferAvailableNotification(BufferListener listener, MemorySegment segment) {
    // We do not know which locks have been acquired before the recycle() or are needed in the
    // notification and which other threads also access them.
    // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    // 调用listener的notifyBufferAvailable方法
    NotificationResult notificationResult = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    // 如果listener需要更多的buffer
    if (notificationResult.needsMoreBuffers()) {
        synchronized (availableMemorySegments) {
            if (isDestroyed) {
                // cleanup tasks how they would have been done if we only had one synchronized block
                listener.notifyBufferDestroyed();
            } else {
                // 再次把它注册为listener
                registeredListeners.add(listener);
            }
        }
    }
    return notificationResult;
}

重设LocalBufferPool的currentPoolSize

NetworkBufferPoolredistributeBuffers方法可以重设每个LocalBufferPool的大小。这是通过调用LocalBufferPoolsetNumBuffers方法实现。
该方法的代码如下所示:

@Override
public void setNumBuffers(int numBuffers) throws IOException {
    int numExcessBuffers;
    CompletableFuture<?> toNotify = null;
    synchronized (availableMemorySegments) {
        // 检查重新设定的pool size必须要大于或等于numberOfRequiredMemorySegments
        checkArgument(numBuffers >= numberOfRequiredMemorySegments,
                "Buffer pool needs at least %s buffers, but tried to set to %s",
                numberOfRequiredMemorySegments, numBuffers);

        // 设置currentPoolSize,不大于maxNumberOfMemorySegments
        if (numBuffers > maxNumberOfMemorySegments) {
            currentPoolSize = maxNumberOfMemorySegments;
        } else {
            currentPoolSize = numBuffers;
        }

        // 归还超出部分的内存给NetworkBufferPool
        returnExcessMemorySegments();

        numExcessBuffers = numberOfRequestedMemorySegments - currentPoolSize;
        if (numExcessBuffers < 0 && availableMemorySegments.isEmpty() && networkBufferPool.isAvailable()) {
            toNotify = availabilityHelper.getUnavailableToResetUnavailable();
        }
    }

    mayNotifyAvailable(toNotify);

    // If there is a registered owner and we have still requested more buffers than our
    // size, trigger a recycle via the owner.
    if (bufferPoolOwner != null && numExcessBuffers > 0) {
        bufferPoolOwner.releaseMemory(numExcessBuffers);
    }
}

数据接收端的buffer管理

数据接收端主要是指RemoteInputChannel。该channel用来接受和处理从其他节点读取到的数据。和RemoteInputChanel对应的是LocalInputChannel,负责读取本地的subpartition,不需要使用接收端缓存。

Buffer的请求

RemoteInputChannel储存从上游接收到的数据使用的buffer从AvailableBufferQueue中请求。CreditBasedPartitionRequestClientHandlerdecodeBufferOrEvent方法负责处理netty通信接收到的上游数据。调用inputChannelrequestBuffer,获取一个buffer。如果能够获取到buffer,会接着调用inputChannelonBuffer方法,把接收到的数据存入到receivedBuffers中。
decodeBufferOrEvent相关代码如下所示:

// 从inputChannel请求缓存
Buffer buffer = inputChannel.requestBuffer();
if (buffer != null) {
    // 从netty缓存中读取数据到inputChannel的缓存
    nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
    buffer.setCompressed(bufferOrEvent.isCompressed);
    // 通知inputChannel缓存数据已准备就绪
    inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else if (inputChannel.isReleased()) {
    cancelRequestFor(bufferOrEvent.receiverId);
} else {
    throw new IllegalStateException("No buffer available in credit-based input channel.");
}

我们看一下requestBuffer方法。该方法从bufferQueue中拿到一个buffer并返回。其中bufferQueue对象为AvailableBufferQueue类型。AvailableBufferQueue负责维护RemoteInputChannel的可用内存,稍后分析。

@Nullable
public Buffer requestBuffer() {
    synchronized (bufferQueue) {
        return bufferQueue.takeBuffer();
    }
}

最后在onBuffer方法中,把已经填充了数据的内存块放入receivedBuffers队列(调用receivedBuffers.add(buffer)方法)。后续的operator可以通过StreamTaskNetworkInput读取InputChannel中缓存的数据。

Buffer的回收

回收buffer的调用位置在StreamTaskNetworkInputemitNext方法。

该方法负责从InputGate中拉取数据,数据被反序列化器反序列化之后发往operator。
负责操作内存的关键部分内容如下所示:

@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {

    while (true) {
        // get the stream element from the deserializer
        if (currentRecordDeserializer != null) {
            DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
            // 关键点
            if (result.isBufferConsumed()) {
                currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                currentRecordDeserializer = null;
            }

            if (result.isFullRecord()) {
                processElement(deserializationDelegate.getInstance(), output);
                return InputStatus.MORE_AVAILABLE;
            }
        }

        // ...
    }
}

我们分析下关键部分的代码。如果buffer中的数据已经被反序列化完毕(result.isBufferConsumed()返回true),调用反序列化器中内存块的recycleBuffer方法。在这里currentRecordDeserializer.getCurrentBuffer()NetworkBuffer类型。

继续跟踪NetworkBuffer类的recycle方法。代码如下:

@Override
public void recycleBuffer() {
    release();
}

跟随release方法。我们查看下AbstractReferenceCountedByteBuf中的相关代码,如下所示:

public boolean release() {
    return this.handleRelease(updater.release(this));
}

private boolean handleRelease(boolean result) {
    if (result) {
        this.deallocate();
    }

    return result;
}

这里有调用了实现类(即NetworkBuffer)的deallocate方法。代码如下:

@Override
protected void deallocate() {
    recycler.recycle(memorySegment);
}

最终我们找到了调用入口,即recyclerrecycle方法。
NetworkBuffer是一个MemorySegment的封装,创建的时候需要指定他们的回收器recycler。Recycler需要实现BufferRecycler接口,该接口仅有一个方法recycle,存放了MemorySegment的回收逻辑。
此处NetworkBuffer解耦了内存回收逻辑的调用。不同类型的buffer回收逻辑是不同的(后面介绍两种类型的buffer)。通过在创建NetworkBuffer的时候,指定它们对应的recycler,上层使用的这些缓存的时候无需再知道具体的缓存类型和回收方式。

稍后具体分析两种类型的buffer,以及它们的创建和回收方法。

还有一个释放RemoteInputChannel所有缓存的地方:AvailableBufferQueuereleaseAll方法。该方法稍后介绍。

AvailableBufferQueue

AvailableBufferQueue负责维护RemoteInputChannel的可用buffer。它的内部维护了两个内存队列,分别为floatingBuffers(浮动buffer)和exclusiveBuffers(专用buffer)。代码如下所示:

/** The current available floating buffers from the fixed buffer pool. */
private final ArrayDeque<Buffer> floatingBuffers;

/** The current available exclusive buffers from the global buffer pool. */
private final ArrayDeque<Buffer> exclusiveBuffers;

其中,专属buffer是在创建InputChannel的时候分配的(assignExclusiveSegments方法,间接调用了NetworkBufferPoolrequestMemorySegments)。专属buffer的大小是固定的,归RemoteInputChannel独享。如果出现专属buffer不够用的情况,会申请浮动buffer。浮动buffer在InputChannel所属的bufferPool中申请。所有属于同一个inputGateinputChannel共享这一个bufferPool。总结来说浮动buffer可以理解为是按需分配,从公有的bufferPool中"借用"的。

获取可用内存

请求内存的逻辑位于takeBuffer方法,如下面所示:

@Nullable
Buffer takeBuffer() {
    if (floatingBuffers.size() > 0) {
        return floatingBuffers.poll();
    } else {
        return exclusiveBuffers.poll();
    }
}

从中不难得出,如果有可用的浮动内存,会优先使用他们。没有可用浮动内存的时候会使用专属内存。

添加专属buffer的逻辑

为队列增加专属buffer的方法为addExclusiveBuffer,代码如下所示:

int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
    exclusiveBuffers.add(buffer);
    if (getAvailableBufferSize() > numRequiredBuffers) {
        Buffer floatingBuffer = floatingBuffers.poll();
        floatingBuffer.recycleBuffer();
        return 0;
    } else {
        return 1;
    }
}

这段代码主要逻辑为:

  1. 添加buffer到专属buffer队列。
  2. 如果可用buffer数(浮动buffer+专属buffer)大于所需内存buffer数,回收一个浮动buffer。
  3. 如果回收了浮动内存,返回0,否则返回1。

分配专属buffer的调用时机

在InputGate的setup阶段为所有的input channel分配专属内存。查看SingleInputGatesetup方法,代码如下:

@Override
public void setup() throws IOException, InterruptedException {
    checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
    // assign exclusive buffers to input channels directly and use the rest for floating buffers
    // 为所有的InputChannel分配专用buffer,剩下的作为浮动buffer
    assignExclusiveSegments();

    // 设置bufferPool,用于分配浮动buffer
    BufferPool bufferPool = bufferPoolFactory.get();
    setBufferPool(bufferPool);

    // 请求各个input channel需要读取的subpartition
    requestPartitions();
}

继续跟踪assignExclusiveSegments方法调用,代码如下:

@VisibleForTesting
public void assignExclusiveSegments() throws IOException {
    synchronized (requestLock) {
        for (InputChannel inputChannel : inputChannels.values()) {
            if (inputChannel instanceof RemoteInputChannel) {
                ((RemoteInputChannel) inputChannel).assignExclusiveSegments();
            }
        }
    }
}

这里逻辑就很清晰了,分别调用SingleInputGate中每个InputChannelassignExclusiveSegments方法。

assignExclusiveSegments方法作用是为RemoteinputChannel分配buffer。
该方法的代码如下:

void assignExclusiveSegments() throws IOException {
    checkState(initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have " +
        "already been set for this input channel.");
    // 使用memorySegmentProvider分配内存,它的类型是NetworkBufferPool
    Collection<MemorySegment> segments = checkNotNull(memorySegmentProvider.requestMemorySegments());
    checkArgument(!segments.isEmpty(), "The number of exclusive buffers per channel should be larger than 0.");

    // 初始化credit,credit用于反压,告诉上游我还有多少个可用buffer
    initialCredit = segments.size();
    numRequiredBuffers = segments.size();

    // 将获取到的内存片段加入到`bufferQueue`
    synchronized (bufferQueue) {
        for (MemorySegment segment : segments) {
            // 此处使用NetworkBuffer封装了memorySegment。因此该内存块回收的时候会调用RemoteInputChannel的recycle方法
            bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
        }
    }
}

回收专有内存

专有内存的回收逻辑位于recycle方法:

@Override
public void recycle(MemorySegment segment) {
    int numAddedBuffers;

    synchronized (bufferQueue) {
        // Similar to notifyBufferAvailable(), make sure that we never add a buffer
        // after releaseAllResources() released all buffers (see below for details).
        // 如果channel已经release(调用了releaseAllResources之后为true)
        if (isReleased.get()) {
            try {
                //使用memorySegmentProvider回收内存
                memorySegmentProvider.recycleMemorySegments(Collections.singletonList(segment));
                return;
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        }
        // 添加内存片段到专属内存
        numAddedBuffers = bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
    }

    // 如果增加的缓存大于0,并且增加缓存前unannouncedCredit为0的话,告诉上游可以接收数据了
    if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(numAddedBuffers) == 0) {
        notifyCreditAvailable();
    }
}

分配浮动buffer

RemoteInputChannelonSenderBacklog方法通过根据backlog(积压的数量)提前分配内存,如果backlog加上初始的credit大于可用buffer数,需要分配浮动buffer。它的代码如下:

void onSenderBacklog(int backlog) throws IOException {
    int numRequestedBuffers = 0;

    synchronized (bufferQueue) {
        // Similar to notifyBufferAvailable(), make sure that we never add a buffer
        // after releaseAllResources() released all buffers (see above for details).
        if (isReleased.get()) {
            return;
        }

        numRequiredBuffers = backlog + initialCredit;
        while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
            Buffer buffer = inputGate.getBufferPool().requestBuffer();
            if (buffer != null) {
                bufferQueue.addFloatingBuffer(buffer);
                numRequestedBuffers++;
            } else if (inputGate.getBufferProvider().addBufferListener(this)) {
                // If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
                isWaitingForFloatingBuffers = true;
                break;
            }
        }
    }

    if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
        notifyCreditAvailable();
    }
}

这个方法在Flink 源码之节点间通信已经分析过了。我们在这里重点关注while循环内的逻辑。

while循环不断的从inputGatebufferPool(即LocalBufferPool)获取可用的缓存块,加入到浮动缓存队列中,同时记录请求的缓存块数量,直到可用缓存数量不再小于需要缓存块的数量时候停止循环。

回收浮动buffer

浮动buffer被回收操作完毕后,会被放回到inputChannel的浮动buffer队列中。浮动buffer的回收逻辑位于LocalBufferPoolrecycle方法。该方法调用RemoteInputChannelnotifyBufferAvailable方法,将回收后的buffer重新加入浮动buffer队列。该recycle方法在之前已经分析过,此处不再赘述。

接下来问题是,为什么浮动buffer的回收使用的是LocalBufferPoolrecycle方法。我们看一下上面onSenderBacklog中分配内存的代码inputGate.getBufferPool().requestBuffer()方法:

@Override
public Buffer requestBuffer() throws IOException {
    return toBuffer(requestMemorySegment());
}

跟踪toBuffer方法,如下所示:

private Buffer toBuffer(MemorySegment memorySegment) {
    if (memorySegment == null) {
        return null;
    }
    return new NetworkBuffer(memorySegment, this);
}

这里使用NetworkBuffer封装了memorySegment。第二个参数的含义为回收器(recycler),传入的是LocalBufferPool自己。因此,浮动内存的回收调用的是LocalBufferPoolrecycle方法。

releaseAll方法

releaseAll方法会回收并释放所有的浮动buffer和专属buffer。代码如下:

void releaseAll(List<MemorySegment> exclusiveSegments) {
    Buffer buffer;
    while ((buffer = floatingBuffers.poll()) != null) {
        buffer.recycleBuffer();
    }
    while ((buffer = exclusiveBuffers.poll()) != null) {
        exclusiveSegments.add(buffer.getMemorySegment());
    }
}

该方法首先回收所有的浮动buffer,再回收所有专用buffer。清空内部exclusiveBuffers队列,将已经回收的专用buffer添加到方法参数的exclusiveSegments队列中。

发送端的buffer管理

数据发送端最重要的角色是RecordWriterResultPartitionRecordWriter负责将数据流中的元素序列化,存入到ResultSubpartition中。一个数据处理流程(Task)对应着一个ResultPartitionResultPartition的数据需要发送数据到多个下游Channel(对应着下游多个Task),ResultPartition中的数据专门为下游不同的接收者做了分组,这个分组叫做ResultSubpartition

RecordWriter具有两个子类:BroadcastRecordWriterChannelSelectorRecordWriter,分别负责广播形式写入数据和根据channel选择器向指定的channel写入数据。这两个子类的emit方法主要逻辑都位于父类RecordWriteremit方法。

写入数据到发送端缓存

RecordWriteremit方法代码如下:

protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
    checkErroneous();

    // 设置序列化器要操作的record
    serializer.serializeRecord(record);

    // Make sure we don't hold onto the large intermediate serialization buffer for too long
    // 序列化record,存入到目标channel的缓存中
    if (copyFromSerializerToTargetChannel(targetChannel)) {
        // 此时代表数据已写入完毕同时正在写入的buffer已经写满,需要执行裁剪serializer操作
        serializer.prune();
    }
}

我们再分析下copyFromSerializerToTargetChannel方法:

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
    // We should reset the initial position of the intermediate serialization buffer before
    // copying, so the serialization results can be copied to multiple target buffers.
    serializer.reset();

    boolean pruneTriggered = false;
    BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
    SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
    while (result.isFullBuffer()) {
        // 这里很重要,当一个buffer被写满的时候需要标记该buffer的状态为finished
        // 在ResultSubpartition取出缓存数据的时候会对buffer队列中每个buffer的finished状态进行检查
        // 只允许buffer队列中最后一个buffer的状态为没有finished
        finishBufferBuilder(bufferBuilder);

        // If this was a full record, we are done. Not breaking out of the loop at this point
        // will lead to another buffer request before breaking out (that would not be a
        // problem per se, but it can lead to stalls in the pipeline).
        if (result.isFullRecord()) {
            pruneTriggered = true;
            emptyCurrentBufferBuilder(targetChannel);
            break;
        }

        bufferBuilder = requestNewBufferBuilder(targetChannel);
        result = serializer.copyToBufferBuilder(bufferBuilder);
    }
    checkState(!serializer.hasSerializedData(), "All data should be written at once");

    if (flushAlways) {
        flushTargetPartition(targetChannel);
    }
    return pruneTriggered;
}

该方法的主要逻辑如下:

  1. 获取一个BufferBuilder
  2. 写入数据流元素到bufferBuilder
  3. 如果buffer被写满,修改buffer状态为finished,申请新的buffer继续写入
  4. buffer写满同时元素也被完全写入,需要跳出循环,并且执行emptyCurrentBufferBuilder(targetChannel),清除writer中目标channel和BufferBuilder的对应关系。下次使用会申请新的BufferBuilder,同时返回true,提示序列化器需要prune

为何要执行serializer的prune操作?对于serializer而言,他的内部维护了一个buffer,buffer的大小随着数据的写入会逐渐扩容,当buffer的数据完全写入到bufferBuilder的时候,serializer无需再维护扩容之后的buffer,因此调用prune方法。将serializer中的buffer还原为初始的buffer大小,减少内存的占用。

接下来分析下内存是如何获取的。我们查看下getBufferBuilder方法。我们找一个有代表性的常用的子类ChannelSelectorRecordWriter中的getBufferBuilder方法,代码如下所示:

@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    // ChannelSelector内维护了一个bufferBuilders数组,使用targetChannel作为下标,获取和它对应的bufferBuilder
    if (bufferBuilders[targetChannel] != null) {
        return bufferBuilders[targetChannel];
    } else {
        // 如果该channel对应的bufferBuilder不存在,则创建一个bufferBuilder
        return requestNewBufferBuilder(targetChannel);
    }
}

@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());

    // 从targetPartition获取bufferBuilder
    // targetPartition是一个ResultPartitionWriter类型
    // 实现类为RecordPartition
    BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
    // 创建一个BufferConsumer
    targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
    // 已目标channel为下标,存入bufferBuilders数组并返回
    bufferBuilders[targetChannel] = bufferBuilder;
    return bufferBuilder;
}

经过上面的分析我们可以看出来RecordWriterResultPartition获取一个个BufferBuilder,将元素序列化写入BufferBuilder之后,再根据BufferBuilder创建出一个BufferConsumer,再加入到targetPartition中(实际上在ResultSubpartition)。

那么BufferBuilderBufferConsumer分别是负责什么的呢?BufferBuilder是一个辅助类,用于将java.nio.ByteBuffer的内容写入到Flink的MemorySegment中(append方法)。同样,BufferConsumer也是一个辅助类,用于读取MemorySegment的数据(build方法,将数据包装为Buffer类型返回)。

我们继续分析下requestNewBufferBuilder方法中的targetPartition.getBufferBuilder()调用。查看ResultPartitiongetBufferBuilder方法,代码如下:

@Override
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
    checkInProduceState();

    return bufferPool.requestBufferBuilderBlocking();
}

这里从缓存池bufferPool以阻塞方式获取BufferBuilder。此处的bufferPoolLocalBufferPool类型。

接下来一个关键的地方是ResultPartitionaddBufferConsumer方法。代码如下:

@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
    checkNotNull(bufferConsumer);

    ResultSubpartition subpartition;
    try {
        checkInProduceState();
        subpartition = subpartitions[subpartitionIndex];
    }
    catch (Exception ex) {
        bufferConsumer.close();
        throw ex;
    }

    return subpartition.add(bufferConsumer);
}

这个方法从subpartitions数组中以targetChannel为下标(在这个方法内的形参为subpartitionIndex),获取对应的subpartition,然后调用subpartitionadd方法添加bufferConsumer

ResultSubpartition有两个子类:

对于流处理模式,Flink使用的是PipelinedSubpartition,接下来我们分析这个子类的相关方法。
PipelinedSubpartitionadd方法源码如下:

@Override
public boolean add(BufferConsumer bufferConsumer) {
    // 间接调用了下面的cufferConsumer,设置finished变量为false,不强制通知readerView
    return add(bufferConsumer, false);
}

private boolean add(BufferConsumer bufferConsumer, boolean finish) {
    checkNotNull(bufferConsumer);

    final boolean notifyDataAvailable;
    synchronized (buffers) {
        // 如果buffer已经finished的同时也已被释放,关闭这个bufferConsumer并回收内存
        if (isFinished || isReleased) {
            bufferConsumer.close();
            return false;
        }

        // Add the bufferConsumer and update the stats
        // 把bufferConsumer加入到buffers队列中
        buffers.add(bufferConsumer);
        // 增加总buffer数量计数器
        updateStatistics(bufferConsumer);
        // 增加待处理的buffer数量计数器
        increaseBuffersInBacklog(bufferConsumer);
        // 判断是否需要通知readerView去消费数据
        // 如果buffers队列中只有一个buffer,并且这个buffer已经finish,返回true
        // 如果buffers队列中只有一个buffer,这时又加入一个新的buffer,那么第一个buffer会被认为已经finish,方法返回true
        // 其实在这里判断有多少个buffer已经finished的逻辑如下
        // 1. 如果只有一个buffer,需要判断下这个buffer是否finish
        // 2. 如果有多个buffer,返回buffers的size - 1。默认出最后一个之外,所有的buffer都已经finish
        notifyDataAvailable = shouldNotifyDataAvailable() || finish;

        isFinished |= finish;
    }

    if (notifyDataAvailable) {
        notifyDataAvailable();
    }

    return true;
}

到此为止,我们分析了RecordWriter中buffer的获取,数据的写入和准备subpartition供下游消费的过程。接下来需要关注缓存中数据发往下游的过程。

从发送端缓存读取数据

ResultSubpartition为数据消费端建立一个ResultSubpartitionView。消费者(这里是netty的数据发送server),通过调用ResultSubpartitionViewgetNextBuffer获取buffer中的数据。我们仍然以PipelinedSubpartitionView为例,分析下getNextBuffer方法。

@Nullable
@Override
public BufferAndBacklog getNextBuffer() {
    return parent.pollBuffer();
}

接着查看下PipelinedSubpartitionpollBuffer方法。代码和分析如下所示:

@Nullable
BufferAndBacklog pollBuffer() {
    synchronized (buffers) {
        Buffer buffer = null;

        // 如果buffers队列为空,不需要flush
        if (buffers.isEmpty()) {
            flushRequested = false;
        }

        while (!buffers.isEmpty()) {
            BufferConsumer bufferConsumer = buffers.peek();

            // 转换bufferConsumer为Buffer类型
            buffer = bufferConsumer.build();

            // 检查buffers队列,没有finished的buffer不能在队列头部
            // 只允许buffer队列最后一个buffer的状态为没有finish
            checkState(bufferConsumer.isFinished() || buffers.size() == 1,
                "When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");

            // 如果队列只有一个buffer,不需要flush
            if (buffers.size() == 1) {
                // turn off flushRequested flag if we drained all of the available data
                flushRequested = false;
            }

            // 如果当前bufferConsumer已经finish,回收这个buffer,待处理的buffer数量减1
            if (bufferConsumer.isFinished()) {
                // 注意,回收已经finished的buffer的调用在此
                buffers.pop().close();
                decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
            }

            // 如果可读字节数大于0,跳出循环
            if (buffer.readableBytes() > 0) {
                break;
            }
            // 否则回收内存
            buffer.recycleBuffer();
            buffer = null;
            if (!bufferConsumer.isFinished()) {
                // 运行到这里只有一种可能,buffer队列中只有一个没有finished的buffer,跳出循环
                break;
            }
        }

        if (buffer == null) {
            return null;
        }

        // 更新总字节数计数器
        updateStatistics(buffer);
        // Do not report last remaining buffer on buffers as available to read (assuming it's unfinished).
        // It will be reported for reading either on flush or when the number of buffers in the queue
        // will be 2 or more.
        // 返回BufferAndBacklog对象,该对象包装了包含数据的buffer
        // BufferAndBacklog对象包含4个字段,buffer(缓存数据),isMoreAvailable(是否有更多数据),buffersInBacklog(挤压的buffer数量)和nextBufferIsEvent(下一个buffer是否是保存的是事件)
        return new BufferAndBacklog(
            buffer,
            // 如果需要flush,或者finished的buffer数量大于0,返回true
            isAvailableUnsafe(),
            getBuffersInBacklog(),
            nextBufferIsEventUnsafe());
    }
}

到这里,发送端缓存的申请,数据写入和读取和回收过程已经分析完毕。


本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

上一篇下一篇

猜你喜欢

热点阅读