Java

Hudi 源码之Bloom Index

2023-11-14  本文已影响0人  AlienPaul

Bloom filter

Bloom filter中文名为布隆过滤器。用于检索一个元素是否在一个集合中。这个算法具有很高的时间和空间效率。但代价是存在一定的误判概率(假阳)。Bloom filter判断某个元素不存在,则一定不存在;判断某个元素存在,会有较小的概率不存在。可以根据集合数据量精心设计bloom filter的bitset大小,平衡误判的概率和bitset的存储占用。

Bloom filter详解请见:深入浅出BloomFilter原理

Hudi在upsert操作的时候需要定位record位于哪个数据文件。使用bloom filter可以加速定位数据的过程。Hudi索引的配置参见:Indexing | Apache Hudi

Bloom filter 索引定位过程

  1. 找出需要查询索引的record所属的partition path,找出这些partition path对应的数据文件,以及每个数据文件存储的record key的范围。可以快速排除绝大部分的数据文件,加快索引定位速度。
  2. 根据上面的record key范围信息,将record和可能位于的数据文件对应起来。这个范围太广,接下来步骤需要缩小范围。
  3. 利用这些数据文件的bloom filter,将不存在的数据排除,可能存在的数据保存起来(bloom filter存在假阳)。
  4. 逐个验证数据文件中是否真的包含bloom filter判断可能存在的数据,将假阳性的剔除。

源代码分析

HoodieBloomIndex

Hudi利用索引标记数据位置的方法为HoodieIndex::tagLocation。这个方法不同的索引类型有不同的实现。其中bloom filter对应的是HoodieBloomIndex::tagLocation。它的源代码如下所示:

@Override
public <R> HoodieData<HoodieRecord<R>> tagLocation(
    HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
    HoodieTable hoodieTable) {
    // Step 0: cache the input records if needed
    // 对应hoodie.bloom.index.use.caching配置,默认为true
    // 将需要index查找的数据缓存起来
    if (config.getBloomIndexUseCaching()) {
        records.persist(new HoodieConfig(config.getProps())
                        .getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
    }

    // Step 1: Extract out thinner pairs of (partitionPath, recordKey)
    // 提取出这些数据的partition path和record key
    HoodiePairData<String, String> partitionRecordKeyPairs = records.mapToPair(
        record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()));

    // Step 2: Lookup indexes for all the partition/recordkey pair
    //查找索引,找到这些数据所在文件的信息(file id)
    HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePairs =
        lookupIndex(partitionRecordKeyPairs, context, hoodieTable);

    // Cache the result, for subsequent stages.
    // 如果启用了缓存,将查找结果缓存起来
    if (config.getBloomIndexUseCaching()) {
        keyFilenamePairs.persist(new HoodieConfig(config.getProps())
                                 .getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
    }
    if (LOG.isDebugEnabled()) {
        long totalTaggedRecords = keyFilenamePairs.count();
        LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
    }

    // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
    // 将数据位置标记到records中并返回
    HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records, hoodieTable);

    if (config.getBloomIndexUseCaching()) {
        records.unpersist();
        keyFilenamePairs.unpersist();
    }

    return taggedRecords;
}

LookupIndex返回record(用record key表示)和所在位置(instant time和file ID)的对应关系。

private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
    HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
    final HoodieTable hoodieTable) {
    // Step 1: Obtain records per partition, in the incoming records
    // 获取每个分区对应的record key的个数
    Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
    // 获取这些partition path
    List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

    // Step 2: Load all involved files as <Partition, filename> pairs
    // 获取每个分区包含的file id和这个文件的最大最小record key的对应关系
    List<Pair<String, BloomIndexFileInfo>> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList);
    // 将相同分区下的所有文件组成list形式
    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));

    // Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
    // that contains it.
    // 根据前面所述的对应关系,找到这些record可能属于的文件
    // 返回file group id和record key的对应关系
    HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
        explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);

    // 使用bloom filter,获取record key所在的文件
    return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
                                                           partitionRecordKeyPairs, fileComparisonPairs, partitionToFileInfo, recordsPerPartition);
}

getBloomIndexFileInfoForPartitions用来获取partition path包含的数据文件以及record key范围。

private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartitions(HoodieEngineContext context,
                                                                                  HoodieTable hoodieTable,
                                                                                  List<String> affectedPartitionPathList) {
    List<Pair<String, BloomIndexFileInfo>> fileInfoList = new ArrayList<>();

    // 对应配置hoodie.bloom.index.prune.by.ranges,默认启用
    // 如果启用,可以从metadata table的column stat(列统计信息)中读取key范围
    // 否则,需要逐个从数据文件中读取,速度较慢
    if (config.getBloomIndexPruneByRanges()) {
        // load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
        if (config.getBloomIndexUseMetadata()
            && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
            fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
        }
        // fallback to loading column ranges from files
        if (isNullOrEmpty(fileInfoList)) {
            fileInfoList = loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable);
        }
    } else {
        fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
    }

    return fileInfoList;
}

explodeRecordsWithFileComparisons根据getBloomIndexFileInfoForPartitions的结果,找到这些record可能属于的文件。

HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
    final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
    HoodiePairData<String, String> partitionRecordKeyPairs) {
    // 对应hoodie.bloom.index.use.treebased.filter配置项,默认启用,表示使用区间树查找数据可能位于的文件
    // 否则使用链表遍历,效率较低
    IndexFileFilter indexFileFilter =
        config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
        : new ListBasedIndexFileFilter(partitionToFileIndexInfo);

    return partitionRecordKeyPairs.map(partitionRecordKeyPair -> {
        String recordKey = partitionRecordKeyPair.getRight();
        String partitionPath = partitionRecordKeyPair.getLeft();

        // 查找record所在位置
        return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey)
            .stream()
            .map(partitionFileIdPair ->
                 new ImmutablePair<>(
                     new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
    })
        .flatMapToPair(Stream::iterator);
}

ListBasedHoodieBloomIndexHelper

由于前面根据record key范围定位出的数据位置存在很大的误判的可能性,这里调用ListBasedHoodieBloomIndexHelperfindMatchingFilesForRecordKeys方法,使用数据文件对应的bloom filter过滤出数据文件中真正存在的record。

@Override
public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
    HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
    HoodiePairData<String, String> partitionRecordKeyPairs,
    HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
    Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
    // 将对应关系按照HoodieFileGroupId排序
    List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
        fileComparisonPairs.collectAsList().stream()
        .sorted(Comparator.comparing(Pair::getLeft)).collect(toList());

    // HoodieBloomIndexCheckFunction使用数据文件的bloom filter,逐个确认数据是否真正在对应的数据文件中
    List<HoodieKeyLookupResult> keyLookupResults =
        CollectionUtils.toStream(
        new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
        .apply(fileComparisonPairList.iterator())
    )
        .flatMap(Collection::stream)
        .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
        .collect(toList());

    // 组装返回结果
    return context.parallelize(keyLookupResults).flatMap(lookupResult ->
                                                         lookupResult.getMatchingRecordKeys().stream()
                                                         .map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
                                                        ).mapToPair(pair -> {
        HoodieKeyLookupResult lookupResult = pair.getLeft();
        String recordKey = pair.getRight();
        return new ImmutablePair<>(
            new HoodieKey(recordKey, lookupResult.getPartitionPath()),
            new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
    });
}

使用bloom filter检查文件是否真正包含某条record的逻辑位于HoodieBloomIndexCheckFunction

HoodieBloomIndexCheckFunction

HoodieBloomIndexCheckFunction使用数据文件对应的bloom filter来判断record是否真的在数据文件中。

HoodieBloomIndexCheckFunction::apply方法返回的是前面file group id和record key的对应关系集合的迭代器。

@Override
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> fileGroupIdRecordKeyPairIterator) {
    return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
}

在后面flatmap处理这个迭代器转换成的stream的时候,会调用LazyKeyCheckIterator::computeNext方法。内容如下:

@Override
protected List<HoodieKeyLookupResult> computeNext() {

    List<HoodieKeyLookupResult> ret = new ArrayList<>();
    try {
        // process one file in each go.
        // 一轮处理一个数据文件
        // 查询一个数据文件的bloom filter
        while (inputItr.hasNext()) {
            I tuple = inputItr.next();

            // 找出record key和它可能所在的数据文件位置信息
            HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
            String recordKey = recordKeyExtractor.apply(tuple);

            String fileId = fileGroupId.getFileId();
            String partitionPath = fileGroupId.getPartitionPath();

            Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

            // lazily init state
            // 创建查找handle,查找partitionPathFilePair对应的bloom filter
            if (keyLookupHandle == null) {
                keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
            }

            // if continue on current file
            // HoodieKeyLookupHandle是懒创建策略,只有在需要检索下一个数据文件的时候才会创建新的handle
            if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
                // 如果bloom filter包含该record key,说明该record key对应的记录可能会在这个文件中
                // 在结果中加入这个record
                keyLookupHandle.addKey(recordKey);
            } else {
                // do the actual checking of file & break out
                // 当处理到下个文件的时候,检查可能包含之前文件中的记录是否真的存在,因为bloom filter存在假阳现象
                // 在返回结果之前,需要去数据文件中检查这些key是否真的在数据文件中存在,具体逻辑在getLookupResult方法中
                // 将检验过后的返回结果添加到ret中
                ret.add(keyLookupHandle.getLookupResult());
                // 然后创建新的HoodieKeyLookupHandle用来使用下一个数据文件对应的bloom filter
                keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
                // 这里和上面相同,使用bloom filter判断record是否可能在这个数据文件中
                keyLookupHandle.addKey(recordKey);
                break;
            }
        }

        // handle case, where we ran out of input, close pending work, update return val
        if (!inputItr.hasNext()) {
            ret.add(keyLookupHandle.getLookupResult());
        }
    } catch (Throwable e) {
        if (e instanceof HoodieException) {
            throw (HoodieException) e;
        }

        throw new HoodieIndexException("Error checking bloom filter index. ", e);
    }

    return ret;
}

在迭代的时候针对每一个数据文件的bloom filter,会创建出专门的HoodieKeyLookupHandle来处理。

HoodieKeyLookupHandle

HoodieKeyLookupHandle方法负责通过数据文件对应的bloom filter,检查数据文件是否包含某个record。

HoodieKeyLookupHandle::addKey方法判断bloom filter是否可能包含record key。如果可能包含,加入到候选列表candidateRecordKeys中。

public void addKey(String recordKey) {
    // check record key against bloom filter of current file & add to possible keys if needed
    // 使用bloom filter判断是否可能包含该record key
    if (bloomFilter.mightContain(recordKey)) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFileIDPair);
        }
        // 加入到候选中
        candidateRecordKeys.add(recordKey);
    }
    // 总共检查的key计数值 +1
    totalKeysChecked++;
}

HoodieKeyLookupHandle::getLookupResult方法将candidateRecordKeys中的record key代入数据文件中校验是否真的存在。因为bloom filter存在一定概率假阳性,需要再次检查。

public HoodieKeyLookupResult getLookupResult() {
    if (LOG.isDebugEnabled()) {
        LOG.debug("#The candidate row keys for " + partitionPathFileIDPair + " => " + candidateRecordKeys);
    }

    // 获取最新版本的base file
    HoodieBaseFile baseFile = getLatestBaseFile();
    // 检查这些record key是否真的在base file中,返回base file真实包含的record key
    List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(baseFile.getPath()), candidateRecordKeys,
                                                                    hoodieTable.getHadoopConf());
    LOG.info(
        String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
                      candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
    // 组装成查找结果并返回
    // 包含文件(partition path,file id和commit time)同包含的record key的对应关系
    return new HoodieKeyLookupResult(partitionPathFileIDPair.getRight(), partitionPathFileIDPair.getLeft(),
                                     baseFile.getCommitTime(), matchingKeys);
}

最后我们跟踪一下bloom filter是从哪里读取的。查看getBloomFilter方法,内容如下:

private BloomFilter getBloomFilter() {
    BloomFilter bloomFilter = null;
    HoodieTimer timer = HoodieTimer.start();
    try {
        // 如果启用hoodie.bloom.index.use.metadata(默认启用)并且metadata table存放的有bloom filter
        // 从metadata table加载bloom filter,可以提高速度
        if (config.getBloomIndexUseMetadata()
            && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
            .contains(BLOOM_FILTERS.getPartitionPath())) {
            bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
                .orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
        } else {
            // 否则读取数据文件中的bloom filter
            try (HoodieFileReader reader = createNewFileReader()) {
                bloomFilter = reader.readBloomFilter();
            }
        }
    } catch (IOException e) {
        throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e);
    }
    LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
    return bloomFilter;
}

Hudi建议将bloom filter统一保存在metadata table中,可以提高加载速度。

上一篇下一篇

猜你喜欢

热点阅读