flink TaskManager 内存模型(二)

2020-04-30  本文已影响0人  邵红晓

MemorySegment

堆内相对地址 protected final byte[] heapMemory;
堆外绝对内存地址 protected long address;
MemorySegment的实现类有两个HeapMemorySegment(unused),HybridMemorySegment
有两个实现类就无法使用 JIT 优化的性能,具体是调用可以通过去虚化(de-virtualized)和内联(inlined)来提升性能,性能提高2.7倍,所以flink采用了HybridMemorySegment一个实现类,可以同时操作堆外和堆内内存,这得益于sun.misc.Unsafe.getLong(Object reference, long offset),如果refer为空,则会访问内存绝对地址,否则访问相对地址

优点

1.不需要每次为读写做准备,直接设置读写指针进行读写操作,可以直接调用discardReadBytes,复用之前读取过的内存0<readerIndex<=writeIndex<capacity ,clear即清除缓冲区的指针状态,回复到初始值,readerIndex=writeIndex=0<capacity
2.当refCnt引用计数=0,自动回收到池中,而不是真正释放
由于Netty是一个NIO网络框架,因此对于Buffer的使用如果基于直接内存(DirectBuffer)实现的话,将会大大提高I/O操作的效率,然而DirectBuffer和HeapBuffer相比之下除了I/O操作效率高之外还有一个天生的缺点,即对于DirectBuffer的申请相比HeapBuffer效率更低,因此Netty结合引用计数实现了PolledBuffer,即池化的用法,当引用计数等于0的时候,Netty将Buffer回收致池中,在下一次申请Buffer的没某个时刻会被复用,flink用法同netty相似
3.通过内置的composite buffer类型可以实现zero-copy

AbstractReferenceCountedByteBuf .java

 private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
    static {
        AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        }
        refCntUpdater = updater;
    }
 cas 使用自旋锁直到compareAndSet成功跳出
 private ByteBuf retain0(int increment) {
        for (;;) {
            int refCnt = this.refCnt;
            final int nextCnt = refCnt + increment;

            // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
            if (nextCnt <= increment) {
                throw new IllegalReferenceCountException(refCnt, increment);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
                break;
            }
        }
        return this;
    }

flink 自己实现了release释放,当前引用refCnt - decrement,
AbstractReferenceCountedByteBuf #protected abstract void deallocate();-> NetworkBuffer #recycler.recycle(memorySegment);

 private boolean release0(int decrement) {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt < decrement) {
                throw new IllegalReferenceCountException(refCnt, -decrement);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                if (refCnt == decrement) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }
    private void returnMemorySegment(MemorySegment segment) {
        assert Thread.holdsLock(availableMemorySegments);
        numberOfRequestedMemorySegments--;
        networkBufferPool.recycle(segment);
    }

MemoryManager

MemoryManager 是管理 Managed Memory 的类,这部分主要是在 Batch 模式下使用,在 Streaming 模式下这一块内存不会分配。MemoryManager 主要通过内部接口 MemoryPool 来管理所有的 MemorySegment。
MemoryManagerBuilder # private final Map<MemoryType, Long> memoryPools = new EnumMap<>(MemoryType.class)
MemoryManager#allocateManagedSegment如下

private MemorySegment allocateManagedSegment(MemoryType memoryType, Object owner) {
        switch (memoryType) {
            case HEAP:
                return allocateUnpooledSegment(getPageSize(), owner);
            case OFF_HEAP:
                return allocateOffHeapUnsafeMemory(getPageSize(), owner);
            default:
                throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
        }
    }

总结:

参考:
https://blog.jrwang.me/2019/flink-source-code-memory-management/#memorymanager
https://www.jianshu.com/p/61a7916b37fd
http://www.whitewood.me/2019/10/17/Flink-1-10-%E7%BB%86%E7%B2%92%E5%BA%A6%E8%B5%84%E6%BA%90%E7%AE%A1%E7%90%86%E8%A7%A3%E6%9E%90/

上一篇 下一篇

猜你喜欢

热点阅读