Flink基于RoaringBitmap的精确去重方案
2020-06-16 本文已影响0人
小胡子哥灬
在Flink实时统计中,提到去重,我能想到比较流行的几种方式:
- 布隆过滤器 - 非精确去重,精度可以配置,但精度越高,需要的开销就越大。主流的框架可以使用guava的实现,或者借助于redis的bit来自己实现,hash算法可以照搬guava的。
- HyperLoglog - 基于基数的非精确去重,优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
- BitMap - 优点是精确去重,占用空间小(在数据相对均匀的情况下)。缺点是只能用于数字类型(int或者long)。
本文主要讲述Flink基于RoaringBitmap的去重方案,首先引入依赖:
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.8.13</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.6</version>
</dependency>
构建BitIndex
BitMap固然好用,但是对去重的字段只能用int或者long类型;但是如果去重字段不是int或者long怎么办呢?那我们就构建一个字段与BitIndex的映射关系表,bitIndex从1开始递增。比如{a = 1, b = 2, c = 3};使用的时候先从映射表里根据字段取出对应的bitindex,如果没有,则全局生成一个。这里我用redis来作为映射表。具体实现我放在一个MapFunction里,如下:
public class BitIndexBuilderMap extends RichMapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>> {
private static final Logger LOG = LoggerFactory.getLogger(BitIndexBuilderMap.class);
private static final String GLOBAL_COUNTER_KEY = "FLINK:GLOBAL:BITINDEX";
private static final String GLOBAL_COUNTER_LOCKER_KEY = "FLINK:GLOBAL:BITINDEX:LOCK";
private static final String USER_BITINDEX_SHARDING_KEY = "FLINK:BITINDEX:SHARDING:";
/**
* 把用户id分散到redis的100个map中,防止单个map的无限扩大,也能够充分利用redis cluster的分片功能
*/
private static final Integer REDIS_CLUSTER_SHARDING_MODE = 100;
private HashFunction hash = Hashing.crc32();
private RedissonClient redissonClient;
@Override
public void open(Configuration parameters) throws Exception {
// ParameterTool globalPara = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Config config = new Config();
config.setCodec(new StringCodec());
config.useClusterServers().addNodeAddress(getRedissonNodes("redis1:8080,redis2:8080,redis3:8080"))
.setPassword("xxxx").setSlaveConnectionMinimumIdleSize(1)
.setMasterConnectionPoolSize(2)
.setMasterConnectionMinimumIdleSize(1)
.setSlaveConnectionPoolSize(2)
.setSlaveConnectionMinimumIdleSize(1)
.setConnectTimeout(10000)
.setTimeout(10000)
.setIdleConnectionTimeout(10000);
redissonClient = Redisson.create(config);
}
/**
* 把userId递增化,在redis中建立一个id映射关系
* @param in
* @return
* @throws Exception
*/
@Override
public Tuple3<String, String, Integer> map(Tuple2<String, String> in) throws Exception {
String userId = in.f0;
//分片
int shardingNum = Math.abs(hash.hashBytes(userId.getBytes()).asInt()) % REDIS_CLUSTER_SHARDING_MODE;
String mapKey = USER_BITINDEX_SHARDING_KEY + shardingNum;
RMap<String, String> rMap = redissonClient.getMap(mapKey);
// 如果为空,生成一个bitIndex
String bitIndexStr = rMap.get(userId);
if(StringUtils.isEmpty(bitIndexStr)) {
LOG.info("userId[{}]的bitIndex为空, 开始生成bitIndex", userId);
RLock lock = redissonClient.getLock(GLOBAL_COUNTER_LOCKER_KEY);
try{
lock.tryLock(60, TimeUnit.SECONDS);
// 再get一次
bitIndexStr = rMap.get(userId);
if(StringUtils.isEmpty(bitIndexStr)) {
RAtomicLong atomic = redissonClient.getAtomicLong(GLOBAL_COUNTER_KEY);
bitIndexStr = String.valueOf(atomic.incrementAndGet());
}
rMap.put(userId, bitIndexStr);
}finally{
lock.unlock();
}
LOG.info("userId[{}]的bitIndex生成结束, bitIndex: {}", userId, bitIndexStr);
}
return new Tuple3<>(in.f0, in.f1, Integer.valueOf(bitIndexStr));
}
@Override
public void close() throws Exception {
if(redissonClient != null) {
redissonClient.shutdown();
}
}
private String[] getRedissonNodes(String hosts) {
List<String> nodes = new ArrayList<>();
if (hosts == null || hosts.isEmpty()) {
return null;
}
String nodexPrefix = "redis://";
String[] arr = StringUtils.split(hosts, ",");
for (String host : arr) {
nodes.add(nodexPrefix + host);
}
return nodes.toArray(new String[nodes.size()]);
}
}
去重逻辑
通过MapFunction拿到字段对应的BitIndex之后,就可以直接进行去重逻辑了,比如我要统计某个页面下的访问人数:
public class CountDistinctFunction extends KeyedProcessFunction<Tuple, Tuple3<String, String, Integer>, Tuple2<String, Long>> {
private static final Logger LOG = LoggerFactory.getLogger(CountDistinctFunction.class);
private ValueState<Tuple2<RoaringBitmap, Long>> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Types.TUPLE(Types.GENERIC(RoaringBitmap.class), Types.LONG)));
}
@Override
public void processElement(Tuple3<String, String, Integer> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// retrieve the current count
Tuple2<RoaringBitmap, Long> current = state.value();
if (current == null) {
current = new Tuple2<>();
current.f0 = new RoaringBitmap();
}
current.f0.add(in.f2);
long processingTime = ctx.timerService().currentProcessingTime();
if(current.f1 == null || current.f1 + 10000 <= processingTime) {
current.f1 = processingTime;
// write the state back
state.update(current);
ctx.timerService().registerProcessingTimeTimer(current.f1 + 10000);
} else {
state.update(current);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
Tuple2<RoaringBitmap, Long> result = state.value();
result.f0.runOptimize();
out.collect(new Tuple2<>(key.f0, result.f0.getLongCardinality()));
}
}
主程序的主要代码:
env.addSource(source).map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] arr = StringUtils.split(value, ",");
return new Tuple2<>(arr[0], arr[1]);
}
})
.keyBy(0) //根据userId分组
.map(new BitIndexBuilderMap()) //构建bitindex
.keyBy(1) //统计页面下的访问人数
.process(new CountDistinctFunction())
.print();
测试数据:
shizc,www.baidu..com
shizc,www.baidu.com
shizc1,www.baidu.com
shizc2,www.baidu.com
shizc,www.baidu..com
shizc,www.baidu..com
shizc,www.baidu..com
shizc,www.hahaha.com
shizc,www.hahaha.com
shizc1,www.hahaha.com
shizc2,www.hahaha.com
输出 :
(www.baidu.com,4)
(www.hahaha.com,3)
总结
-
如果你的数据字段已经是数字类型时,可以不用构建BitIndex,但是要确保你的字段是有规律,而且递增,如果是long类型还可以用Roaring64NavigableMap,但如果是雪化算法生成的id,最好不要用,因为不能压缩,占用空间非常大,笔者之前就是直接用Roaring64NavigableMap,1000多万个id就达到了700多M。
-
以上实现在数据量特别大的时候,在生成bitindex的时候会有性能的瓶颈,所以我们应该预先构建BitIndex,也就是把你的数据库当中的所有用户id,预先用flink批处理任务,生成映射。基本代码如下:
// main方法
final ExecutionEnvironment env = buildExecutionEnv();
//如果没有找到好的方法保证id单调递增,就设置一个并行度
env.setParallelism(1);
TextInputFormat input = new TextInputFormat(new Path(MEMBER_RIGHTS_HISTORY_PATH));
input.setCharsetName("UTF-8");
DataSet<String> source = env.createInput(input).filter(e -> !e.startsWith("user_id")).map(
new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] arr = StringUtils.split(value, ",");
return arr[0];
}
})
.distinct();
source
.map(new RedisMapBuilderFunction())
.groupBy(0)
.reduce(new RedisMapBuilderReduce())
.output(new RedissonOutputFormat());
long counter = source.count();
env.fromElements(counter).map(new MapFunction<Long, Tuple3<String, String, Object>>() {
@Override
public Tuple3<String, String, Object> map(Long value) throws Exception {
return new Tuple3<>("FLINK:GLOBAL:BITINDEX", "ATOMICLONG", value);
}
}).output(new RedissonOutputFormat());
// 注意分区逻辑和key要和stream的保持一致
public class RedisMapBuilderFunction implements MapFunction<String, Tuple3<String, String, Object>> {
private static final String USER_BITINDEX_SHARDING_KEY = "FLINK:BITINDEX:SHARDING:";
private static final Integer REDIS_CLUSTER_SHARDING_MODE = 100;
private HashFunction hash = Hashing.crc32();
private Integer counter = 0;
@Override
public Tuple3<String, String, Object> map(String userId) throws Exception {
counter ++;
int shardingNum = Math.abs(hash.hashBytes(userId.getBytes()).asInt()) % REDIS_CLUSTER_SHARDING_MODE;
String key = USER_BITINDEX_SHARDING_KEY + shardingNum;
Map<String, String> map = new HashMap<>();
map.put(userId, String.valueOf(counter));
return new Tuple3<>(key, "MAP", map);
}
}
public class RedisMapBuilderReduce implements ReduceFunction<Tuple3<String, String, Object>> {
@Override
public Tuple3<String, String, Object> reduce(Tuple3<String, String, Object> value1, Tuple3<String, String, Object> value2) throws Exception {
Map<String, String> map1 = (Map<String, String>) value1.f2;
Map<String, String> map2 = (Map<String, String>) value2.f2;
map1.putAll(map2);
return new Tuple3<>(value1.f0, value1.f1, map1);
}
}
//输出 到redis
public class RedissonOutputFormat extends RichOutputFormat<Tuple3<String, String, Object>> {
private RedissonClient redissonClient;
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
Config config = new Config();
config.setCodec(new StringCodec());
config.useClusterServers().addNodeAddress(getRedissonNodes("redis1:8080,redis2:8080,redis3:8080"))
.setPassword("xxx").setSlaveConnectionMinimumIdleSize(1)
.setMasterConnectionPoolSize(2)
.setMasterConnectionMinimumIdleSize(1)
.setSlaveConnectionPoolSize(2)
.setSlaveConnectionMinimumIdleSize(1)
.setConnectTimeout(10000)
.setTimeout(10000)
.setIdleConnectionTimeout(10000);
redissonClient = Redisson.create(config);
}
/**
* k,type,value
* @param record
* @throws IOException
*/
@Override
public void writeRecord(Tuple3<String, String, Object> record) throws IOException {
String key = record.f0;
RKeys rKeys = redissonClient.getKeys();
rKeys.delete(key);
String keyType = record.f1;
if("STRING".equalsIgnoreCase(keyType)) {
String value = (String) record.f2;
RBucket<String> rBucket = redissonClient.getBucket(key);
rBucket.set(value);
} else if("MAP".equalsIgnoreCase(keyType)) {
Map<String, String> map = (Map<String, String>) record.f2;
RMap<String, String> rMap = redissonClient.getMap(key);
rMap.putAll(map);
} else if("ATOMICLONG".equalsIgnoreCase(keyType)) {
long l = (long) record.f2;
RAtomicLong atomic = redissonClient.getAtomicLong(key);
atomic.set(l);
}
}
@Override
public void close() throws IOException {
if(redissonClient != null) {
redissonClient.shutdown();
}
}
private String[] getRedissonNodes(String hosts) {
List<String> nodes = new ArrayList<>();
if (hosts == null || hosts.isEmpty()) {
return null;
}
String nodexPrefix = "redis://";
String[] arr = StringUtils.split(hosts, ",");
for (String host : arr) {
nodes.add(nodexPrefix + host);
}
return nodes.toArray(new String[nodes.size()]);
}
}