Flink在用户行为分析中的应用(二)

2022-09-04  本文已影响0人  冰菓_

一. 项目背景

业务系统一般有广告引擎,推荐引擎,搜索引擎,秒杀系统;当然实时智能运营系统也是业务系统的一部分,能为公司带来直接价值的.智能运营系统是一个用软件系统自动、实时监控用户的行为,并实时做出判断,并进而驱动营销推送的系统

二. 需求分析

营销规则举例
  1. 活跃度为A,男性,高星级用户,连续登陆7天的用户圈选
  2. 搜索过"商务休闲"关键字的用户,如果点击了"旅拍"坑位,则推送一张给定规则的优惠券
  3. 某用户,连续做过3次A行为后,没有做过[BC]行为序列的人群圈选
  4. 检测到某用户,在做过[AB]行为后,3分钟内,做过[C|D]行为,则触发监控告警
  5. 某高星级用户,规定时间内搜索的关键字,符合给定规则的特征,则推送app相关消息
营销规则特征
  1. 营销规则有多样的属性
  2. 营销规则通常是有有效期设置
  3. 营销规则要根据业务的变化灵活的进行热管理,水平扩展
  4. 营销规则要圈选的人群可能不是一个静态的人群包
营销规则的要素抽象
  1. 营销规则大体是对用户的画像和行为进行判断的
  2. 营销规则是有时间窗口限制的(规则上线前后)
  3. 营销规则的要素通常有
  1. 条件属性:就是各类逻辑的组合,与,或,非,大于,小于,等于,交集,并集
  2. 对比属性:事件和事件的对比关系,如AB事件的时间间隔应该大于N
  3. 时间属性:就是限制圈选的时间范围
  4. 画像属性:画像标签库的属性值,如年龄,性别,年代,星级
  5. 度量属性:如,登陆天数,消费金额,某行为事件发生M次,SUM(属性值),MAX(属性值)
  6. 序列属性:如用户连续触发[ABC]事件.后3s内触发D事件等

三. 设计概要

画像条件的设计概要
  1. 一般的画像标签是枚举值类型的,将数据存放在hbase中即可解决,但是对于一些例如兴趣爱好,出行意图往往不是枚举类型的,而是类似于文档,例如一个人爱好标签是"咖啡",而标签中可能存储了多种类型的"咖啡:抹茶咖啡:猫屎咖啡..",这涉及到了模糊查询,hbase很难支持,对于es这类文档数据库,就非常简单了,所以哦我们考虑使用es,来存储画像标签
  2. 显然大量的用户基数的查询,es也力不从心,考虑到画像标签的实时价值并不高,我们可以读取T+1的画像标签即可,我们考虑标签值进行bitmap编码的思路,编码后的数据广播与事件流进行逻辑判断
行为条件的设计概要
  1. 行为明细数据存储的两难选择:doris有着强大的查询能力,但是面对高频词请求,复杂的查询需求,就无法满足我们极为严苛的响应延迟要求,如果将数据都保存在state中,由于大状态非常不容易维护,也是不可取的
  2. 我们无法笼统的解决这个问题,需要具体分析具体对待;如果只涉及上线前的历史行为分析判断,那么我们可以完全参照画像条件的思路,将统计结果形成固化的结果数据;如果只涉及上线后的数据,则通过state来计算,那么state如何进行行为分析呢?其实我们,在计算一个规则条件时,并不需要完整的明细数据,可以通过滚动聚合的方式来计算;如果横跨上线前后行为分析场景,就需要进行拆分和整合,我们考虑把要整合的数据存储在redis中,这是由于redis读写速度与flink内部的state不相上下,能满足我们的高并发低延迟读写要求并且redis中有丰富的数据结构够我们使用
  3. 行为序列的分析需要设计不同的状态结构,业务规则的需求是无限的,我们无法用同一套代码来应付灵活多变的规则条件计算逻辑,因此要考虑从业务编码中解放出来,我们考虑使用外部系统为规则去选用一个 “状态机”,然后注入到我们的规则引擎,我们考虑使用动态脚本语言来实现,如Groovy
  4. 综上所述,开发人员往规则管理平台添加groory模板数据,开发flink流程消费用户行为数据,并通过cdc获取mysql中的规则数据,连接进行各类规则的状态机运算

四. 技术验证

Groovy动态调用
bitmap发布流程
  1. bitmap序列化 --> mysql元数据表 --> 反序列化的可行性
public class RulePublisher {
    public static void main(String[] args) throws SQLException, IOException, ClassNotFoundException {
         String ruleId = "rule_0001";
         //根据规则,去es中查询人群
         int[] ruleProfileUsers = {1,3,5};
         //把查询出来的人群包,变成bitmap
        RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf(ruleProfileUsers);
         //把生成好的bitmap,序列到一个字节数组中
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        roaringBitmap.serialize(dataOutputStream);
        byte[] bitmapBytes = byteArrayOutputStream.toByteArray();
        //将这个bitmap连同本规则,一同发布到规则平台的元数据系统中
        //注意在mysql中的byte类型
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/stream_db?serverTimezone=Asia/Shanghai", "root", "123456");
        PreparedStatement prepareStatement = connection.prepareStatement("insert into rule_tf values (?,?)");
        prepareStatement.setString(1,ruleId);
        prepareStatement.setBytes(2,bitmapBytes);
        prepareStatement.execute();
        prepareStatement.close();
    }
}
public class BitmapFromMysqlBytes {
    public static void main(String[] args) throws SQLException, IOException {
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/stream_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
        PreparedStatement prepareStatement = connection.prepareStatement("select rule,profile_bitmap from rule_tf where rule =?");
        prepareStatement.setString(1,"rule_0001");
        ResultSet resultSet = prepareStatement.executeQuery();


        resultSet.next();
        byte[] profileBitmap = resultSet.getBytes("profile_bitmap");
        //反序列化bitmap
        RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
        roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
        //测试反序列化的数据是否为之前的数据
        System.out.println(roaringBitmap.contains(1));
        System.out.println(roaringBitmap.contains(3));
        System.out.println(roaringBitmap.contains(201));
        System.out.println(roaringBitmap.contains(202));

        prepareStatement.close();
        connection.close();
    }
}
flink-cdc规则获取流程
  1. 一个flinkcdc的demo
public class FlinkCdcDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        //踩坑:请将flinktable中的 <scope>provided</scope>注释掉
        DebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("my_data")
                .tableList("my_data.log_trail")
                .startupOptions(StartupOptions.initial())
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();
        //4.使用 CDC Source 从 MySQL 读取数据
        DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
        //5.打印数据
        mysqlDS.print();
        //6.执行任务
        env.execute();
    }
}
  1. 使用flinksql-cdc获取mysql中的bitmap
public class FlinkCdcBitmap {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        //2.x版本需要设置Checkpoint,否则只能读取全量数据,无法实时读取增量数据
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d/sun/");
        StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
        //创建cdc连接器
        tableEnv.executeSql("CREATE TABLE rule_tf (" +
                " rule STRING PRIMARY KEY NOT ENFORCED, " +
                " profile_bitmap BINARY " +
                ") WITH (" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'localhost', " +
                " 'port' = '3306', " +
                " 'username' = 'root' ," +
                " 'password' = '123456' ," +
                " 'database-name' = 'my_data' ," +
                " 'table-name' = 'rule_tf'" +
                ")");
        // 踩坑,不支持使用CDC2.2版本,请使用CDC2.0,另外flink1.2不支持sql-cdc
        Table table = tableEnv.sqlQuery("select rule,profile_bitmap from rule_tf");
        tableEnv.toChangelogStream(table).print();
        SingleOutputStreamOperator<String> process = tableEnv.toChangelogStream(table).process(new ProcessFunction<Row, String>() {
            @Override
            public void processElement(Row value, ProcessFunction<Row, String>.Context ctx, Collector<String> out) throws Exception {
                RowKind valueKind = value.getKind();
                if (valueKind == RowKind.INSERT) {
                    String rule_id = (String) value.getField("rule");
                    byte[] profileBitmap = (byte[]) value.getField("profile_bitmap");
                    //反序列化bitmap
                    RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
                    assert profileBitmap != null;
                    roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
                    System.out.println(Arrays.toString(roaringBitmap.stream().toArray()));
                    out.collect(Arrays.toString(roaringBitmap.stream().toArray()));
                }
            }
        });
        process.print();
        env.execute();
    }
}
  1. cdc规则数据整合事件流

public class FlinkInjectRuleBitmapProcessEvents {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d/sun/");
        StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
        //模拟一个事件流
        SingleOutputStreamOperator<Tuple2<String, String>> event = env.socketTextStream("192.168.10.100", 5666).map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] split = value.split(",");
                return Tuple2.of(split[0], split[1]);
            }
        });


        //创建cdc连接器
        tableEnv.executeSql("CREATE TABLE rule_tf (" +
                " rule STRING PRIMARY KEY NOT ENFORCED, " +
                " profile_bitmap BINARY " +
                ") WITH (" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'localhost', " +
                " 'port' = '3306', " +
                " 'username' = 'root' ," +
                " 'password' = '123456' ," +
                " 'database-name' = 'my_data' ," +
                " 'table-name' = 'rule_tf'" +
                ")");

        Table table = tableEnv.sqlQuery("select rule,profile_bitmap from rule_tf");
        DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);
        SingleOutputStreamOperator<Tuple2<String, RoaringBitmap>> streamRule = changelogStream.map(new MapFunction<Row, Tuple2<String, RoaringBitmap>>() {
            @Override
            public Tuple2<String, RoaringBitmap> map(Row value) throws Exception {
                String rule_id = (String) value.getField("rule");
                byte[] profileBitmap = (byte[]) value.getField("profile_bitmap");
                RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
                roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
                //标记获取规则的状态
                if (RowKind.DELETE == value.getKind()) {
                    return Tuple2.of(rule_id+":"+"DELETE", roaringBitmap);
                }
                else {
                    //更新则覆盖
                    return Tuple2.of(rule_id+":"+"UPSERT", roaringBitmap);
                }
            }
        });
        //进行广播
        MapStateDescriptor<String, RoaringBitmap> ruleStateDescriptor = new MapStateDescriptor<>("Rule", String.class, RoaringBitmap.class);
        BroadcastStream<Tuple2<String, RoaringBitmap>> broadcast = streamRule.broadcast(ruleStateDescriptor);
        //将广播流与行为事件进行连接
        event.keyBy(tp -> tp.f0)
                .connect(broadcast)
                .process(new KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>() {
                    @Override
                    public void processElement(Tuple2<String, String> event, KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
                        ReadOnlyBroadcastState<String, RoaringBitmap> readOnlyContextBroadcastState = readOnlyContext.getBroadcastState(ruleStateDescriptor);
                        //对系统中的每个规则,进行一次人群判断
                        for (Map.Entry<String, RoaringBitmap> entry : readOnlyContextBroadcastState.immutableEntries()) {
                            String rule_id = entry.getKey();
                            RoaringBitmap profileBitmap = entry.getValue();
                            collector.collect(String.format("当前行为事件用户 %d 规则 %s的目标人群是否包含此人 %s",Integer.parseInt(event.f0),rule_id,profileBitmap.contains(Integer.parseInt(event.f0))));
                        }
                    }

                    @Override
                    public void processBroadcastElement(Tuple2<String, RoaringBitmap> ruleInfo, KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>.Context context, Collector<String> collector) throws Exception {
                        BroadcastState<String, RoaringBitmap> broadcastState = context.getBroadcastState(ruleStateDescriptor);
                        //形如 rule:DELETE的拼接
                        String[] split = ruleInfo.f0.split(":");
                        if ("DELETE".equals(split[1])) {
                            System.out.println("删除一条规则" +split[0] +"  " + ruleInfo.f0 );
                            broadcastState.remove(split[0]);
                        }
                        else {
                            System.out.println("新增一条规则" +split[0] +"  " + ruleInfo.f0);
                            broadcastState.put(split[0], ruleInfo.f1);
                        }
                    }
                }).print();


        env.execute();
    }
}

五. 数据开发

通过Spark将用户画像数据导入到ES中
搜索人群画像形成bitmap
规则模板的数据结构
动态条件表达式的设计

规则(前端注入)可能存在多个,我们不能写死,同时规则之间的关系也是复杂的,我们考虑使用动态脚本语言模板来实现

状态机

状态机就是一段逻辑的封装,规则不能写死,同理逻辑也不能写死,我们同样考虑使用动态脚本语言模板来实现,在本篇中状态机即是动态条件表达式与Redis运算逻辑的结合体

触发型&非触发型规则运算
序列型规则运算

使用redis中的顶层key结构,value是某个人匹配到最大步骤数,新增一个结构来保留匹配到序列的次数

public void caclActionSeq(UserEvent userEvent){
  int guid = userEvent.getGuid()
  String redisSeqStepKey = ruleId + ":" + actionSeqConditionId + ":step";
  String redisSeqCntKey = ruleId + ":" + actionSeqConditionId + ":cnt";
//从redis中获取该用户,本规则的行为序列,待完成序列的,已到达的,步骤数
  String preStepStr = jedis.hget (redisSeqKey, guid + "")
  int preStep = preStepStr == null? 0: Integer.parseInt (preStepStr)
//判断本次输入的事件,是否是行为序列参数期待的下一个条件
if (preStep<eventParams.size()){
JSONObject eventParam = eventParams.getJSONObject (preStep)
//如果输入事件,正是行为序列参数期待的下一个事件
if(UserEventComparator.userEventIsEqualParam(userEvent,eventParam)){

if(preStep== eventParams.size ()-1){
//将redis中的步骤号重置为0   
jedis.hset(redisSeStepKey,guid+"," ,  "0")
// 将redis中该用户条件的完成次数+1
jedis.hincrBy(redisSeqCntKey,guid+** .  1)
}
else{
//否则步骤数+1
long by = jedis.hincrBy (redisSeqStepKey, guid + "" +value: 1)
   }
  }
 }
}
定时型规则运算
新规则模型开发

六. 遗留问题

  1. redis中存用某个用户的逻辑数据,如果不做清除,下一次触发又是该用户的情形,会造成类似薅羊毛的事故,由于我们已经对Redis设置为按时间来清除过期数据,为了避免这种情况的发生,我们可以考虑增加一个最大投放次数的状态
  2. [触发型&非触发型规则运算],此处完全可以交给Groovy
  3. 如何处理新增的规则模型
  4. 某个规则圈选的人群,达到千万级别,经过测试,一个达到1亿级别的数据的bitmap,体积大约12M 放入mysql完全没问题
  5. 关于redis中的数据容量的问题?我们的一个规则的一个条件状态记录hash中,也就十万人级别(10万+元素),很轻松的,而且,如果redis负载能力不够的时候,可以部署redis集群的
  6. 如何兼容定时型规则?可以考虑新增一个是否是定时型规则的状态,但是存在的问题是使得项目变得复杂不好维护
  7. 性能测试的方法和描述,一般而言,一个subtask在1s内,能处理4000次行为次数条件的判断运算
上一篇下一篇

猜你喜欢

热点阅读