flink的内存管理
在大数据面前,JVM的内存结构和GC机制往往会成为掣肘
1. 对象开销:在HotSpot中,每个对象占用的内存空间必须是8的倍数。这就导致每个对象占据的内存空间包含三部分:头信息(header)、类的field所占空间、对齐需要的空间(padding,由于前两个大小不够8导致),所以只有一个bool值的对象会占据16个字节,其中头占8字节,bool本身占用一个字节、padding占7个字节
2. GC:大数据量的垃圾回收,尤其是几十甚至上百G的内存应用,对jvm的gc机制是个挑战
3. OOM:OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会fOutOfMemoryError错误,JVM崩溃,分布式框架的健壮性和性能都会受到影响。
4.Cache Miss:现代体系的cpu会有多级缓存,而加载的时候是以Cache Line为单位加载。如果能够将对象连续存储,这样就会大大降低Cache Miss。使得cpu集中处理业务,而不是空转。
接下俩会从内存管理、序列化、数据结构和算法等角度来介绍Flink的一些做法。
一、内存管理
理论上Flink的内存管理分为三部分
1. Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改
2. Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shufflt等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。
3. User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。
对GC的影响
1. 减少full gc时间:因为所有常用数据都在Memory Manager里,这部分内存的生命周期是伴随TaskManager而不会被回收。其他的常用数据对象都是用户定义的数据对象,这部分会快速的被Minor GC回收
2. 减少OOM:所有的运行时的内存应用都从池化的内存中获取,而且运行时的算法可以在内存不足的时候将数据写到堆外内存,例如NormalizedKeySorter、MutableHashTable
3. 节约空间:由于自定序列化/反序列化的方法,所有的对象都以二进制的形式存储,降低消耗
4. 高效的二进制操作和缓存友好:二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好,可以从 L1/L2/L3 缓存获得性能的提升
Flink的序列化
目前 Java 生态圈提供了众多的序列化框架:Java serialization, Kryo, Apache Avro 等等。但是 Flink 实现了自己的序列化框架。因为在 Flink 中处理的数据流通常是同一类型,由于数据集对象的类型固定,对于数据集可以只保存一份对象Schema信息,节省大量的存储空间。同时,对于固定大小的类型,也可通过固定的偏移位置存取。当我们需要访问某个对象成员变量的时候,通过定制的序列化工具,并不需要反序列化整个Java对象,而是可以直接通过偏移量,只是反序列化特定的对象成员变量。如果对象的成员变量较多时,能够大大减少Java对象的创建开销,以及内存数据的拷贝大小。
Flink支持任意的Java或是Scala类型。Flink 在数据类型上有很大的进步,不需要实现一个特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。Flink 通过 Java Reflection 框架分析基于 Java 的 Flink 程序 UDF (User Define Function)的返回类型的类型信息,通过 Scala Compiler 分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由TypeInformation类表示,TypeInformation 支持以下几种类型:
BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。
BasicArrayTypeInfo: 任意Java基本类型数组(装箱的)或 String 数组。
WritableTypeInfo: 任意 Hadoop Writable 接口的实现类。
TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
GenericTypeInfo: 任意无法匹配之前几种类型的类。
前六种数据类型基本上可以满足绝大部分的Flink程序,针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。对于最后一种数据类型,Flink会使用Kryo进行序列化和反序列化。每个TypeInformation中,都包含了serializer,类型会自动通过serializer进行序列化,然后用Java Unsafe接口写入MemorySegments。对于可以用作key的数据类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据进行compare、hash等操作。对于 Tuple、CaseClass、POJO 等组合类型,其TypeSerializer和TypeComparator也是组合的,序列化和比较时会委托给对应的serializers和comparators。如下图展示 一个内嵌型的Tuple3对象的序列化过程。
Flink如何操作二进制数据
Flink会分配一块内催给对应的operator,内存会分为两部分,一部分存储真实的数据,一部分存储object的指针。
以Sort为例:
每个需要比较的对象都分为两部分pointer+key 和object,其中key是定长的key,如果不是定长(比如String)那么就取它的top n character。
将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss。
在交换过程中,只需要比较key就可以完成sort的过程,只有key1 == key2的情况,才需要反序列化拿出实际的对象做比较,而比较之后只需要交换对应的key而不需要交换实际的对象
MemoryManager
MemoryManager提供了两个内部类HeapMemoryPool和HybridOffHeapMemoryPool,这两个类是根据不同的内存类型,将内存池化的两个类。MemoryManager在初始化的时候引入了PageSize的概念,PageSize本质上和MemorySegment没有什么不同,其作用只是为了保持每一个Segment大小是相同的
HeapMemoryPool和Hybrid都会根据Allocate page size预分配一些内存(预分配可能是0)
HeapMemoryPool分配的对象是byte[], 而Hybrid分配的是ByteBuffer。在分配的时候都通过allocate方法进行分配。在allocate方法中,由于担心多线程的问题,会用一把锁做线程同步
这里要注意的是如果为预分配的模式,则从池子里面取,否则allocate new segments
MemoryPool的方法如下;
参考:
1. http://www.javamex.com/tutorials/memory/object_memory_usage.shtml
2. http://www.36dsj.com/archives/33650
3. https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
4. http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/