Flink在用户行为分析中的应用(一)
项目背景
传统的企业营销大体是营销人员通过查询画像标签库去圈选人群,这种方案往往无法抓住那些"转瞬即逝的机会"
如:
- 一个价格敏感型客户,正在反复查看购物车中的某类商品,这时候实时推送优惠卷,能激发客户当时的购买渴望,刺激客户当时的购买行为,是最有效的
- 在搜广推场景下,动态观察用户进入活动各个入口的流量情况,可制定更精准的营销策略.是最有效的
- 在金融风控中,针对某个入口是否为用户真实行为的实时监控判断,对异常交易行为、违法违规行为进行筛查,是最有效的
Flink抓住了那转瞬即逝的机遇,本文仅向读者展示如何使用Flink实现一个企业级的实时营销系统
企业要做实时营销推送,通过自定义营销规则,来提高公司的销售额,这类营销规则大体是,发现一个满足一个特定条件的用户,在做出某类指定行为时,实时给用户推送短信,app消息等...
代码结构
- beans
- MarketingRule:对业务方给定规则的抽象
- EventCondition:对规则事件的抽象
- EventCombinationCondition:是行为组合的抽象,可能包含多个规则事件
- TimerCondition:是对定时规则的抽象,可能包含多个行为组合
- .....
- structure
架构图
整体来说,系统有
架构图.png
- 规则引擎模块:负责规则的匹配计算
- 规则管理模块:负责规则的动态注入及管理(修改,停用,启用,下线)
- 性能监控模块:负责对整个系统运行的关键性能进行监控(ck,hbase请求数,延迟,规则的匹配次数,匹配成立次数,匹配不成立次数)
一个最初版的Demo
方案改进的流程
查询分界点的设计方案
一般情况下ck的查询一秒请求在1000次,所以不能每次数据来了都请求ck,设计查询分界点,避免了高频次请求ck
查询分界点的设计方案.png先确定查询的分界点,分三种情况进行判断,如果说业务的规则条件要求的事件区间的TimeRangeEnd比分界点还要小就全部查询ck,如果说TimeRangeStart大于分界点,那么就要计算状态中满足时间区间的业务数据(与具体的业务eventProperties是无关的,是对查询流程的改进)
简单行为序列跨界查询的思路
具体的查询流程和上述分界点的结构类似,需要在上述Demo的基础上新增一个State的查询服务,跨界查询中,需要保留在ck中查询到的步骤数,根据ck中最大匹配步骤数,来修减(截取)条件中的事件序列,注意在state中时间范围要在业务规则内,最后返回state中匹配的步骤数和ck中的步骤数相加进行判定
伪代码如下
//获取状态state中的数据迭代器
//todo 这里只能get,不能上面传入迭代器,否则迭代一次,就不能迭代第二次了
Iterable<EventBean> eventBean = eventBeanListState.get();
//获取事件组合条件中的感兴趣的事件
List<EventCondition> eventConditionList = combinationCondition.getEventConditionList();
StringBuilder sb = new StringBuilder();
for (EventBean bean : eventBean) {
if (bean.getTimeStamp() > queryRangeStart && bean.getTimeStamp() < queryRangeEnd && deviceId.equals(bean.getDeviceId())){
//判断当前迭代到的bean,是否是条件中感兴趣的事件
for (int i = 1; i < eventConditionList.size(); i++) {
//条件中感兴趣的事件与bean进行对比
if (EventUtil.eventMatchCondition(bean,eventConditionList.get(i-1)) ) {
sb.append(i);
//只要匹配到就结束
break;
}
}
}
}
return sb.toString();
//获取匹配到的事件
String conditionStr = getEventCombinationConditionStr(deviceId, combinationCondition, queryRangeStart, queryRangeEnd);
//封装正则匹配的逻辑
int cnt = EventUtil.sequenceStrMatchRegexCount(conditionStr, combinationCondition.getMatchPattern());
//匹配到的步骤数
return cnt;
复杂行为序列分析的设计方案
- 统一查询的表达方式
- 业务方会给定开发人员埋点key,key一般是用Super Position Model来描述的,复杂的行为序列一般形如:连续触发两次A事件,后触发BC事件,使用SPM来描述往往非常复杂,业务方使用起来也不是很方便,此处提供一种基于正则表达式的描述,具体的转化如下:
- 使用列表数据结构,装载业务方给定的key,把源源不断的给定的用户事件映射为列表的索引,最后根据业务方给定的行为规则到组合条件的事件列表中找到对应的索引号,来作为最终结果(拼接),伪代码如下:
//遍历ck返回结果
StringBuilder sb = new StringBuilder();
while (resultSet.next()) {
String eventId = resultSet.getString(1);
//统一正则表达式
//indexOf 从o开始算
sb.append((eventIds.indexOf(eventId) + 1));
}
return sb.toString();
//然后取出组合条件中的正则表达式,匹配规则
触发&定时型规则的设计方案
- 理解Flink中的Watermark概念
在分布式环境下如何推进事件时间
- Flink中定时器的应用
定时规则形如:触发A事件后4分钟内触发[BCD]事件,该场景比较比较复杂,目前的设计可以满足对定时时间内事件行为的连续序列判断,由于一个key可能存在多个定时规则,因此需要判断是哪个规则触发了定时器,大体的方案是使用一个liststate来保存[规则,定时触发点],伪代码如下:
public void onTimer(long timestamp, KeyedProcessFunction<String, EventBean, RuleMatchResult>.OnTimerContext ctx, Collector<RuleMatchResult> out) throws Exception {
Iterable<Tuple2<MarketingRule, Long>> ruleTimerIterable = ruleTimerState.get();
Iterator<Tuple2<MarketingRule, Long>> ruleTimerIterator = ruleTimerIterable.iterator();
while (ruleTimerIterator.hasNext()) {
Tuple2<MarketingRule, Long> tp = ruleTimerIterator.next();
//判断这个(规则:定时点),是否对应本次的触发点
if (tp.f1 == timestamp){
//如果对应,检查该规则的定时条件(定时条件中包含的就是行为条件列表)
TimerCondition timerCondition = tp.f0.getTimerConditionList().get(0);
//调用service去检查在条件指定的时间范围内,事件的组合发生次数是否满足
boolean b = controller.isMatchTimeCondition(ctx.getCurrentKey(), timerCondition, timestamp - timerCondition.getTimeLate(), timestamp);
//清理已经检查完毕的规则定时点state信息
ruleTimerIterator.remove();
if (b){
out.collect(new RuleMatchResult(ctx.getCurrentKey(),tp.f0.getRuleId(),timestamp,System.currentTimeMillis()));
}
}
//todo ADD 增加删除过期定时信息的逻辑
if (tp.f1 < timestamp){
ruleTimerIterator.remove();
}
}
}
缓存机制设计方案
- 缓存有效性分析
首先是为什么要有缓存?原因有大致两点,不同规则可能拥有相同的条件,同一个规则也可能被同一个人多次触发,增加缓存复用结果减少对OLAP引擎的压力,大致的缓存机制如下:
例如缓存的时间区间为[t4 - t8],而条件区间可能为[t1 - t9],[t5 - t6],[t5 - t9]等,我们可以考虑对条件区间进行拆分,重用[t4 - t8]的结果,这种方案是基于底层数据量大复用结果的考虑,但是对于clickhouse来说,受影响的是查询的次数,经过考虑对等同于[t4 - t8]和包含端点条件区间结果查询结果复用
- 缓存机制方案细节
- 使用Redis中的Hash结构存储Key为分组key+缓存id,value值为Map类型,key为时间区间和当前查询时间的拼接,value值为ck查询结果的拼接
- valueMap中,可能同时存在多个上述的区间范围可能性,如何查询到缓存结果的最优解?
- 关于Redis中的数据过期设置,可根据查询的时间范围来设置过期值,大体来说在每次读取缓存后,更新读到的这条缓存数据的插入时间,删除过期的缓存数据,这种检查并不是针对整个Redis来说,而是对于用户粒度的条件缓存来说的
动态Key的设计思路
- 动态Key数据复制的大致思路
业务方投放的规则可能是按照设备Id来投放分组的,也可能是Ip地址等其他分组条件,如果每种条件都写一个任务往往比较麻烦,本文提供一种思路:把业务方规定的投放key与事件包装为一个对象,通过反射机制获取分组的具体值,业务方可能给出N多个不重复的key,这样就把原来的数据放大了N倍,特殊的一个条件可能会按照多个key来分组,于是我们考虑把事件包装为[分组key的具体值,分组Key字段,事件],伪代码如下
//set中是去重后的业务方规定的规则
for (String keyByFields : set) {
StringBuilder sb = new StringBuilder();
String[] fieldNames = keyByFields.split(",");
for (String fieldName : fieldNames) {
//java反射获取类型对应的具体值
Class<?> name = Class.forName("marketing.beans.EventBean");
Field field = name.getDeclaredField(fieldName);
//设置权限
field.setAccessible(true);
String fieldValue = (String)field.get(eventBean);
sb.append(fieldValue).append(",");
}
String keyByValue = sb.toString().substring(0, sb.length() - 1);
eventBean.setKeyByValue(keyByValue);
//每种分组规则都要进行分发,相当于数据扩大了N倍
collector.collect(new DynamicKeyedBean(keyByValue,keyByFields,eventBean));
}
规则引擎
- 必要性
业务方给定的规则不是一成不变的,如何在不停止job的前提下修改规则是一个重要的需求,Drools可以将复杂多变的规则从硬编码中解放出来,以规则脚本的形式存放在文件中,使得规则的变更不需要修正代码重启job可以立即上线
- Drools热更新流程
- 通过管理Web平台操作规则(新建,删除,启用,停用),导致mysql中规则元数据表变化
- 用canal监听到规则表的操作binlog并发送到kafka中
- flink从kafka消费到规则操作binlog,并将binlog流进行广播后connect事件数据流
- 在后续的处理过程中,通过processbroadcast方法,读取到规则操作binlog进行解析
- 根据解析的结果,对存储规则信息用到的broadcastState进行管理(插入,覆盖更新,删除)操作
- FreeMarker模板规则引擎
可以利用FreeMarker模板引擎生成规则参数中的查询SQL,生成规则controller规则文件DRL