Flink

Flink+HyperLogLog实现海量实时去重计数

2020-05-06  本文已影响0人  LittleMagic

今天忙到飞起(到现在还没完),写一篇超短的小技巧吧。

HyperLogLog是去重计数的利器,能够以很小的精确度误差作为trade-off大幅减少内存空间占用,在不要求100%准确的计数场景极为常用。关于它的数学原理,看官可参见之前写过的《再谈基数估计之HyperLogLog算法》,不再赘述了。

在用Flink做实时计算的过程中,也短不了做去重计数,比如统计UV。我们当然可以直接借助Redis的HyperLogLog实现,但是要在Flink job内直接整合HyperLogLog该怎么做呢?

先引入如下Maven依赖项:

<dependency>
  <groupId>net.agkn</groupId>
  <artifactId>hll</artifactId>
  <version>1.6.0</version>
  <scope>compile</scope>
</dependency>

下面的聚合函数即可实现从WindowedStream按天、分键统计PV和UV。

WindowedStream<AnalyticsAccessLogRecord, Tuple, TimeWindow> windowedStream = watermarkedStream
  .keyBy("siteId")
  .window(TumblingEventTimeWindows.of(Time.days(1)))
  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)));

windowedStream.aggregate(new AggregateFunction<AnalyticsAccessLogRecord, Tuple2<Long, HLL>, Tuple2<Long, Long>>() {
  private static final long serialVersionUID = 1L;

  @Override
  public Tuple2<Long, HLL> createAccumulator() {
    return new Tuple2<>(0L, new HLL(14, 6));
  }

  @Override
  public Tuple2<Long, HLL> add(AnalyticsAccessLogRecord record, Tuple2<Long, HLL> acc) {
    acc.f0++;
    acc.f1.addRaw(record.getUserId());
    return acc;
  }

  @Override
  public Tuple2<Long, Long> getResult(Tuple2<Long, HLL> acc) {
    return new Tuple2<>(acc.f0, acc.f1.cardinality());
  }

  @Override
  public Tuple2<Long, HLL> merge(Tuple2<Long, HLL> acc1, Tuple2<Long, HLL> acc2) {
    acc1.f0 += acc2.f0;
    acc1.f1.union(acc2.f1);
    return acc1;
  }
});

上述开源HyperLogLog组件的主要方法简述如下:

该HyperLogLog组件如同Redis一样实现了稀疏存储与密集存储两种方式,以进一步减少内存占用量。其源码不难理解,看官可以自行参看。

最后,如果一定追求100%准确,该怎么办呢?普通的位图法显然不合适,应该采用压缩位图,如笔者之前提到过的RoaringBitmap

继续忙去了。民那好梦。

上一篇 下一篇

猜你喜欢

热点阅读