[SPARK][CORE] 面试问题之UnsafeShuffle
欢迎关注公众号“Tim在路上”
在说UnsafeShuffleWriter
前,需要先细谈下Tungsten对内存管理的优化。当然这里就不展开讲了以防内容过于冗长。
Tungsten其实是一揽子优化项目的总代称,这里我们主要了解Tungsten对于内存管理的优化。对内存管理优化的原因主要有两方面: 1. Java对象占用内存的空间大。2. Jvm垃圾回收的开销大。
下面我们举例分析下:
我们拿类型是 String 的 name 来举例,如果一个用户的名字叫做“abcd”,它本应该只占用 4 个字节,但在 JVM 的对象存储中,“abcd”会消耗总共 48 个字节,其中包括 12 个字节的对象头信息、8 字节的哈希编码、8 字节的字段值存储和另外 20 个字节的其他开销。
另外,如果存在一个User表,其中存在username String, age Int, sex Char三个字段。那么一行数据需要创建三个包装类,同时需要将其装入到Array数组中,最后封装为GenericMutableRow。那么总共需要5个类。我们知道大量的类的创建会加剧JVM的GC情况,如果可以将其封装为一个类中,那么就减少了大量的类的创建。
为此Tungsten 设计了一种紧凑的二进制格式 Unsafe Row数据结构。
Unsafe Row : 紧凑的数据结构
Unsafe Row 是一种字节数组,它可以用来存储下图所示 Schema 为(userID,name,age,gender)的用户数据条目。总的来说,所有字段都会按照 Schema 中的顺序安放在数组中。其中,定长字段的值会直接安插到字节中,而变长字段会先在 Schema 的相应位置插入偏移地址,再把字段长度和字段值存储到靠后的元素中。
51.png
字节数组的存储方式在消除存储开销的同时,仅用一个数组对象就能轻松完成一条数据的封装,显著降低 GC 压力。
public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
public static int calculateBitSetWidthInBytes(int numFields) {
return ((numFields + 63)/ 64) * 8;
}
private Object baseObject;
private long baseOffset;
/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
private int numFields;
/** The size of this row's backing data, in bytes) */
private int sizeInBytes;
/** The width of the null tracking bit set, in bytes */
private int bitSetWidthInBytes;
public void setNotNullAt(int i) {
assertIndexIsValid(i);
BitSetMethods.unset(baseObject, baseOffset, i);
}
@Override
public void setLong(int ordinal, long value) {
assertIndexIsValid(ordinal);
setNotNullAt(ordinal);
Platform.putLong(baseObject, getFieldOffset(ordinal), value);
}
...
从上面可以看出每个数据元组,有三部分组成[null bit set] [values] [variable length portion]
,null的追踪和word边界的补齐是由bitSetWidthInBytes字段负责。从上面的例子可以看出在赋值Long时调用Platform.putLong直接进行赋值。但是如果插入的数据是可变的数据类型,会先插入offset偏移量,指定在定长插入完成的靠后的位置,然后再插入其长度,最后再插入其数据值。
进一步提升数据存储效率与 GC 效率,Tungsten 还推出了基于内存页的内存管理模式。
基于内存页的内存管理
为了统一管理 Off Heap 和 On Heap 内存空间,Tungsten 定义了统一的 128 位内存地址,简称 Tungsten 地址。Tungsten 地址分为两部分:前 64 位预留给 Java Object,后 64 位是偏移地址 Offset。具体的定义在MemoryLocation类中
public class MemoryLocation {
@Nullable
Object obj;
long offset;
public MemoryLocation(@Nullable Object obj, long offset) {
this.obj = obj;
this.offset = offset;
}
...
}
对于 On Heap 空间的 Tungsten 地址来说,前 64 位存储的是 JVM 堆内对象的引用或者说指针,后 64 位 Offset 存储的是数据在该对象内的偏移地址。而 Off Heap 空间则完全不同,在堆外的空间中,由于 Spark 是通过 Java Unsafe API 直接管理操作系统内存,不存在内存对象的概念,因此前 64 位存储的是 null 值,后 64 位则用于在堆外空间中直接寻址操作系统的内存空间。
public class TaskMemoryManager {
// [1] 页号13位表示
/** The number of bits used to address the page table. */
private static final int PAGE_NUMBER_BITS= 13;
// [2] 偏移量 64 - 13 = 51 位表示
/** The number of bits used to encode offsets in data pages. */
@VisibleForTesting
static final int OFFSET_BITS= 64 -PAGE_NUMBER_BITS; // 51
/** The number of entries in the page table. */
private static final int PAGE_TABLE_SIZE= 1 <<PAGE_NUMBER_BITS;
public static final long MAXIMUM_PAGE_SIZE_BYTES= ((1L << 31) - 1) * 8L;
// [3] page页用MemoryBlock表示,定位一个页如果是堆内前64 位存储的是 JVM 堆内对象的引用,堆外则64 位存储的是 null 值。
/**
* Similar to an operating system's page table, this array maps page numbers into base object
* pointers, allowing us to translate between the hashtable's internal 64-bit address
* representation and the baseObject+offset representation which we use to support both on- and
* off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
* When using an on-heap allocator, the entries in this map will point to pages' base objects.
* Entries are added to this map as new data pages are allocated.
*/
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
/**
* Bitmap for tracking free pages.
*/
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
final MemoryMode tungstenMemoryMode;
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
}
...
}
// [4] 每个page 由Object,offset 确定, length 表示页大小
public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
this.length = length;
}
// [5] 在创建TaskMemoryManager会指定MemoryMode,优先使用堆外内存
final val tungstenMemoryMode: MemoryMode = {
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}
由上代码分析可知:每个Task的内存空间被分为多个内存页Page, 每个内存页本质上都是一个内存块(MemoryBlock)。TaskMemoryManager统一了堆内堆外内存的访问方式,引入了虚拟内存逻辑地址的概念,并将逻辑地址转换为实际的物理地址。逻辑地址是一个64bits的长整型,高13bits用来表示页号pageNumber,低51bits用来表示该内存内部的偏移offset。
Untitled.png内存优化的应用
首先我们来看下HashMap的弊端,Java 标准库中采用数组加链表的方式来实现 HashMap,数组元素存储 Hash code 和链表头。链表节点存储 3 个元素,分别是 Key 引用、Value 引用和下一个元素的地址。
Utle.png
但是,这种实现方式会带来两个弊端。
首先是存储开销和 GC 负担比较大。结合上面的示意图我们不难发现,存储数据的对象值只占整个 HashMap 一半的存储空间,另外一半的存储空间用来存储引用和指针,这 50% 的存储开销还是蛮大的。而且我们发现,图中每一个 Key、Value 和链表元素都是 JVM 对象。假设,我们用 HashMap 来存储一百万条数据条目,那么 JVM 对象的数量至少是三百万。由于 JVM 的 GC 效率与对象数量成反比,因此 java.util.HashMap 的实现方式对于 GC 并不友好。
其次,在数据访问的过程中,标准库实现的 HashMap 容易降低 CPU 缓存命中率,进而降低 CPU 利用率。链表这种数据结构的特点是,对写入友好,但访问低效。用链表存储数据的方式确实很灵活,这让 JVM 可以充分利用零散的内存区域,提升内存利用率。但是,在对链表进行全量扫描的时候,这种零散的存储方式会引入大量的随机内存访问(Random Memory Access)。相比顺序访问,随机内存访问会大幅降低 CPU cache 命中率。
那么,针对以上几个弊端,Tungsten 又是怎么解决的呢?BytesToBytesMap可以看做是Spark实现的HashMap, 我们从存储开销、GC 效率和 CPU cache 命中率分别来分析下。
Uled.png首先,Tungsten 放弃了链表的实现方式,使用数组加内存页的方式来实现 HashMap。数组中存储的元素是 Hash code 和 Tungsten 内存地址,也就是 Object 引用外加 Offset 的 128 位地址。Tungsten HashMap 使用 128 位地址来寻址数据元素,相比 java.util.HashMap 大量的链表指针,在存储开销上更低。另外BytesToBytesMap在出现Hash冲突时采用的是开放定址法,通过探测下一个(idx + 1)位置进行解决。
其次,Tungsten HashMap 的存储单元是内存页,内存页本质上是 Java Object,一个内存页可以存储多个数据条目。因此,相比标准库中的 HashMap,使用内存页大幅缩减了存储所需的对象数量。比如说,我们需要存储一百万条数据记录,标准库的 HashMap 至少需要三百万的 JVM 对象才能存下,而 Tungsten HashMap 可能只需要几个或是十几个内存页就能存下。对比下来,它们所需的 JVM 对象数量可以说是天壤之别,显然,Tungsten 的实现方式对于 GC 更加友好。再者,内存页本质上是 JVM 对象,其内部使用连续空间来存储数据,内存页加偏移量可以精准地定位到每一个数据元素。因此,在需要扫描 HashMap 全量数据的时候,得益于内存页中连续存储的方式,内存的访问方式从原来的随机访问变成了顺序读取(Sequential Access)。顺序内存访问会大幅提升 CPU cache 利用率,减少 CPU 中断,显著提升 CPU 利用率。
Tungsten除此以外还定义了基本的数据类型与数据结构:ByteArray、LongArray和UTF8String类型等。
接下来,我们回归Shuffle, 其实UnsafeShuffleWriter其实现的思路和SortShuffleWriter一致,相当于Tungsten版本的SortShuffleWriter。在Tungsten Shuffle的UnsafeShuffleWriter与SortShuffleWriter不同之处在于UnsafeShuffleWriter中不涉及数据的反序列化的操作。除此以外,在UnsafeShuffleWriter中,ExternalSorter是采用ShuffleExternalSorter替换实现,ShuffleExternalSorter的在功能上和ExternalSorter是一致的。
下一讲,我们将从源码的角度解读UnsafeShuffleWriter的实现过程,理解其中原理和适用场景。
欢迎关注公众号“Tim在路上”