聊聊flink的MemoryPool

2019-02-21  本文已影响6人  go4it

本文主要研究一下flink的MemoryPool

MemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

    abstract static class MemoryPool {

        abstract int getNumberOfAvailableMemorySegments();

        abstract MemorySegment allocateNewSegment(Object owner);

        abstract MemorySegment requestSegmentFromPool(Object owner);

        abstract void returnSegmentToPool(MemorySegment segment);

        abstract void clear();
    }

HybridHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

    static final class HybridHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<byte[]> availableMemory;

        private final int segmentSize;

        HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(new byte[segmentSize]);
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            byte[] buf = availableMemory.remove();
            return  MemorySegmentFactory.wrapPooledHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment heapSegment = (HybridMemorySegment) segment;
                availableMemory.add(heapSegment.getArray());
                heapSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }

HybridOffHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

    static final class HybridOffHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<ByteBuffer> availableMemory;

        private final int segmentSize;

        HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            ByteBuffer buf = availableMemory.remove();
            return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
                ByteBuffer buf = hybridSegment.getOffHeapBuffer();
                availableMemory.add(buf);
                hybridSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }

小结

doc

上一篇 下一篇

猜你喜欢

热点阅读