Flink+HyperLogLog实现海量实时去重计数
今天忙到飞起(到现在还没完),写一篇超短的小技巧吧。
HyperLogLog是去重计数的利器,能够以很小的精确度误差作为trade-off大幅减少内存空间占用,在不要求100%准确的计数场景极为常用。关于它的数学原理,看官可参见之前写过的《再谈基数估计之HyperLogLog算法》,不再赘述了。
在用Flink做实时计算的过程中,也短不了做去重计数,比如统计UV。我们当然可以直接借助Redis的HyperLogLog实现,但是要在Flink job内直接整合HyperLogLog该怎么做呢?
先引入如下Maven依赖项:
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
<scope>compile</scope>
</dependency>
下面的聚合函数即可实现从WindowedStream按天、分键统计PV和UV。
WindowedStream<AnalyticsAccessLogRecord, Tuple, TimeWindow> windowedStream = watermarkedStream
.keyBy("siteId")
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)));
windowedStream.aggregate(new AggregateFunction<AnalyticsAccessLogRecord, Tuple2<Long, HLL>, Tuple2<Long, Long>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, HLL> createAccumulator() {
return new Tuple2<>(0L, new HLL(14, 6));
}
@Override
public Tuple2<Long, HLL> add(AnalyticsAccessLogRecord record, Tuple2<Long, HLL> acc) {
acc.f0++;
acc.f1.addRaw(record.getUserId());
return acc;
}
@Override
public Tuple2<Long, Long> getResult(Tuple2<Long, HLL> acc) {
return new Tuple2<>(acc.f0, acc.f1.cardinality());
}
@Override
public Tuple2<Long, HLL> merge(Tuple2<Long, HLL> acc1, Tuple2<Long, HLL> acc2) {
acc1.f0 += acc2.f0;
acc1.f1.union(acc2.f1);
return acc1;
}
});
上述开源HyperLogLog组件的主要方法简述如下:
-
HLL(int log2m, int regwidth)
创建一个HyperLogLog对象。log2m即总分桶数目以2为底的对数,regwidth则是真正用来做基数估计的比特的下标值宽度。根据Redis的思路,log2m=14,regwidth=6,即可以仅用最多12kB内存,以0.81%的误差计算接近264的基数。 -
void addRaw(long rawValue)
向HyperLogLog中插入元素。如果插入的元素非数值型的,则需要hash过后(推荐用Murmur3等比较快的哈希算法)再插入。 -
long cardinality()
返回该HyperLogLog中元素的基数。 -
void union(HLL other)
将两个HyperLogLog结构合并为一个。
该HyperLogLog组件如同Redis一样实现了稀疏存储与密集存储两种方式,以进一步减少内存占用量。其源码不难理解,看官可以自行参看。
最后,如果一定追求100%准确,该怎么办呢?普通的位图法显然不合适,应该采用压缩位图,如笔者之前提到过的RoaringBitmap。
继续忙去了。民那好梦。