Hudi 源码之Bloom Index
Bloom filter
Bloom filter中文名为布隆过滤器。用于检索一个元素是否在一个集合中。这个算法具有很高的时间和空间效率。但代价是存在一定的误判概率(假阳)。Bloom filter判断某个元素不存在,则一定不存在;判断某个元素存在,会有较小的概率不存在。可以根据集合数据量精心设计bloom filter的bitset大小,平衡误判的概率和bitset的存储占用。
Bloom filter详解请见:深入浅出BloomFilter原理
Hudi在upsert操作的时候需要定位record位于哪个数据文件。使用bloom filter可以加速定位数据的过程。Hudi索引的配置参见:Indexing | Apache Hudi
Bloom filter 索引定位过程
- 找出需要查询索引的record所属的partition path,找出这些partition path对应的数据文件,以及每个数据文件存储的record key的范围。可以快速排除绝大部分的数据文件,加快索引定位速度。
- 根据上面的record key范围信息,将record和可能位于的数据文件对应起来。这个范围太广,接下来步骤需要缩小范围。
- 利用这些数据文件的bloom filter,将不存在的数据排除,可能存在的数据保存起来(bloom filter存在假阳)。
- 逐个验证数据文件中是否真的包含bloom filter判断可能存在的数据,将假阳性的剔除。
源代码分析
HoodieBloomIndex
Hudi利用索引标记数据位置的方法为HoodieIndex::tagLocation
。这个方法不同的索引类型有不同的实现。其中bloom filter对应的是HoodieBloomIndex::tagLocation
。它的源代码如下所示:
@Override
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
// Step 0: cache the input records if needed
// 对应hoodie.bloom.index.use.caching配置,默认为true
// 将需要index查找的数据缓存起来
if (config.getBloomIndexUseCaching()) {
records.persist(new HoodieConfig(config.getProps())
.getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
// Step 1: Extract out thinner pairs of (partitionPath, recordKey)
// 提取出这些数据的partition path和record key
HoodiePairData<String, String> partitionRecordKeyPairs = records.mapToPair(
record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()));
// Step 2: Lookup indexes for all the partition/recordkey pair
//查找索引,找到这些数据所在文件的信息(file id)
HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePairs =
lookupIndex(partitionRecordKeyPairs, context, hoodieTable);
// Cache the result, for subsequent stages.
// 如果启用了缓存,将查找结果缓存起来
if (config.getBloomIndexUseCaching()) {
keyFilenamePairs.persist(new HoodieConfig(config.getProps())
.getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
if (LOG.isDebugEnabled()) {
long totalTaggedRecords = keyFilenamePairs.count();
LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
// 将数据位置标记到records中并返回
HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records, hoodieTable);
if (config.getBloomIndexUseCaching()) {
records.unpersist();
keyFilenamePairs.unpersist();
}
return taggedRecords;
}
LookupIndex
返回record(用record key表示)和所在位置(instant time和file ID)的对应关系。
private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Step 1: Obtain records per partition, in the incoming records
// 获取每个分区对应的record key的个数
Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
// 获取这些partition path
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
// 获取每个分区包含的file id和这个文件的最大最小record key的对应关系
List<Pair<String, BloomIndexFileInfo>> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList);
// 将相同分区下的所有文件组成list形式
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
// that contains it.
// 根据前面所述的对应关系,找到这些record可能属于的文件
// 返回file group id和record key的对应关系
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);
// 使用bloom filter,获取record key所在的文件
return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
partitionRecordKeyPairs, fileComparisonPairs, partitionToFileInfo, recordsPerPartition);
}
getBloomIndexFileInfoForPartitions
用来获取partition path包含的数据文件以及record key范围。
private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartitions(HoodieEngineContext context,
HoodieTable hoodieTable,
List<String> affectedPartitionPathList) {
List<Pair<String, BloomIndexFileInfo>> fileInfoList = new ArrayList<>();
// 对应配置hoodie.bloom.index.prune.by.ranges,默认启用
// 如果启用,可以从metadata table的column stat(列统计信息)中读取key范围
// 否则,需要逐个从数据文件中读取,速度较慢
if (config.getBloomIndexPruneByRanges()) {
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
}
// fallback to loading column ranges from files
if (isNullOrEmpty(fileInfoList)) {
fileInfoList = loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable);
}
} else {
fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
}
return fileInfoList;
}
explodeRecordsWithFileComparisons
根据getBloomIndexFileInfoForPartitions
的结果,找到这些record可能属于的文件。
HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
// 对应hoodie.bloom.index.use.treebased.filter配置项,默认启用,表示使用区间树查找数据可能位于的文件
// 否则使用链表遍历,效率较低
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairs.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair.getRight();
String partitionPath = partitionRecordKeyPair.getLeft();
// 查找record所在位置
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey)
.stream()
.map(partitionFileIdPair ->
new ImmutablePair<>(
new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
})
.flatMapToPair(Stream::iterator);
}
ListBasedHoodieBloomIndexHelper
由于前面根据record key范围定位出的数据位置存在很大的误判的可能性,这里调用ListBasedHoodieBloomIndexHelper
的findMatchingFilesForRecordKeys
方法,使用数据文件对应的bloom filter过滤出数据文件中真正存在的record。
@Override
public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
// 将对应关系按照HoodieFileGroupId排序
List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
fileComparisonPairs.collectAsList().stream()
.sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
// HoodieBloomIndexCheckFunction使用数据文件的bloom filter,逐个确认数据是否真正在对应的数据文件中
List<HoodieKeyLookupResult> keyLookupResults =
CollectionUtils.toStream(
new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
.apply(fileComparisonPairList.iterator())
)
.flatMap(Collection::stream)
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.collect(toList());
// 组装返回结果
return context.parallelize(keyLookupResults).flatMap(lookupResult ->
lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
).mapToPair(pair -> {
HoodieKeyLookupResult lookupResult = pair.getLeft();
String recordKey = pair.getRight();
return new ImmutablePair<>(
new HoodieKey(recordKey, lookupResult.getPartitionPath()),
new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
});
}
使用bloom filter检查文件是否真正包含某条record的逻辑位于HoodieBloomIndexCheckFunction
。
HoodieBloomIndexCheckFunction
HoodieBloomIndexCheckFunction
使用数据文件对应的bloom filter来判断record是否真的在数据文件中。
HoodieBloomIndexCheckFunction::apply
方法返回的是前面file group id和record key的对应关系集合的迭代器。
@Override
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> fileGroupIdRecordKeyPairIterator) {
return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
}
在后面flatmap处理这个迭代器转换成的stream的时候,会调用LazyKeyCheckIterator::computeNext
方法。内容如下:
@Override
protected List<HoodieKeyLookupResult> computeNext() {
List<HoodieKeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
// 一轮处理一个数据文件
// 查询一个数据文件的bloom filter
while (inputItr.hasNext()) {
I tuple = inputItr.next();
// 找出record key和它可能所在的数据文件位置信息
HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
String recordKey = recordKeyExtractor.apply(tuple);
String fileId = fileGroupId.getFileId();
String partitionPath = fileGroupId.getPartitionPath();
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
// lazily init state
// 创建查找handle,查找partitionPathFilePair对应的bloom filter
if (keyLookupHandle == null) {
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
}
// if continue on current file
// HoodieKeyLookupHandle是懒创建策略,只有在需要检索下一个数据文件的时候才会创建新的handle
if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
// 如果bloom filter包含该record key,说明该record key对应的记录可能会在这个文件中
// 在结果中加入这个record
keyLookupHandle.addKey(recordKey);
} else {
// do the actual checking of file & break out
// 当处理到下个文件的时候,检查可能包含之前文件中的记录是否真的存在,因为bloom filter存在假阳现象
// 在返回结果之前,需要去数据文件中检查这些key是否真的在数据文件中存在,具体逻辑在getLookupResult方法中
// 将检验过后的返回结果添加到ret中
ret.add(keyLookupHandle.getLookupResult());
// 然后创建新的HoodieKeyLookupHandle用来使用下一个数据文件对应的bloom filter
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
// 这里和上面相同,使用bloom filter判断record是否可能在这个数据文件中
keyLookupHandle.addKey(recordKey);
break;
}
}
// handle case, where we ran out of input, close pending work, update return val
if (!inputItr.hasNext()) {
ret.add(keyLookupHandle.getLookupResult());
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
}
throw new HoodieIndexException("Error checking bloom filter index. ", e);
}
return ret;
}
在迭代的时候针对每一个数据文件的bloom filter,会创建出专门的HoodieKeyLookupHandle
来处理。
HoodieKeyLookupHandle
HoodieKeyLookupHandle
方法负责通过数据文件对应的bloom filter,检查数据文件是否包含某个record。
HoodieKeyLookupHandle::addKey
方法判断bloom filter是否可能包含record key。如果可能包含,加入到候选列表candidateRecordKeys
中。
public void addKey(String recordKey) {
// check record key against bloom filter of current file & add to possible keys if needed
// 使用bloom filter判断是否可能包含该record key
if (bloomFilter.mightContain(recordKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFileIDPair);
}
// 加入到候选中
candidateRecordKeys.add(recordKey);
}
// 总共检查的key计数值 +1
totalKeysChecked++;
}
HoodieKeyLookupHandle::getLookupResult
方法将candidateRecordKeys
中的record key代入数据文件中校验是否真的存在。因为bloom filter存在一定概率假阳性,需要再次检查。
public HoodieKeyLookupResult getLookupResult() {
if (LOG.isDebugEnabled()) {
LOG.debug("#The candidate row keys for " + partitionPathFileIDPair + " => " + candidateRecordKeys);
}
// 获取最新版本的base file
HoodieBaseFile baseFile = getLatestBaseFile();
// 检查这些record key是否真的在base file中,返回base file真实包含的record key
List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(baseFile.getPath()), candidateRecordKeys,
hoodieTable.getHadoopConf());
LOG.info(
String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
// 组装成查找结果并返回
// 包含文件(partition path,file id和commit time)同包含的record key的对应关系
return new HoodieKeyLookupResult(partitionPathFileIDPair.getRight(), partitionPathFileIDPair.getLeft(),
baseFile.getCommitTime(), matchingKeys);
}
最后我们跟踪一下bloom filter是从哪里读取的。查看getBloomFilter
方法,内容如下:
private BloomFilter getBloomFilter() {
BloomFilter bloomFilter = null;
HoodieTimer timer = HoodieTimer.start();
try {
// 如果启用hoodie.bloom.index.use.metadata(默认启用)并且metadata table存放的有bloom filter
// 从metadata table加载bloom filter,可以提高速度
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
} else {
// 否则读取数据文件中的bloom filter
try (HoodieFileReader reader = createNewFileReader()) {
bloomFilter = reader.readBloomFilter();
}
}
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e);
}
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
return bloomFilter;
}
Hudi建议将bloom filter统一保存在metadata table中,可以提高加载速度。