使用FlinkCEP统计超时未支付的订单/用户触达

2020-03-14  本文已影响0人  和平菌

需求:统计用户下单了但一段时间后没有支付的订单,用来触达用户。

一、模拟一个数据源,用来模拟用户行为

public class UserEvent implements Serializable {

    private String pin; //用户的id
    private String skuId; 商品的skuId
    private String action; //用户事件 0表示下单,1表示支付
}

public class RandomSource extends RichParallelSourceFunction<UserEvent> {

    private boolean isRun;

    private static List<UserEvent> events1 = new ArrayList<>();
    private transient int index = 0;

    @Override
    public void run(SourceContext<UserEvent> ctx) throws InterruptedException {
        while (isRun){
            if(index < events1.size()){
                UserEvent event = events1.get(index % events1.size());
                ctx.collect(event);
                index++;
//                System.out.println("send message:" + JSON.toJSONString(event));
            }
            Thread.sleep(1000);
        }
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        isRun = true;

        addEvent(events1,"zhangsan", "1", "0");
        addEvent(events1,"lisi", "3", "0");
        addEvent(events1,"zhangsan", "1", "1");

    }

    private void addEvent(Collection<UserEvent> collections, String pin,String skuId, String action){
        UserEvent event = new UserEvent();
        event.setPin(pin);
        event.setSkuId(skuId);
        event.setAction(action);
        collections.add(event);
    }


    @Override
    public void cancel() {
        isRun = false;
    }
}

二、逻辑代码

public class Rule6App {

    private static final OutputTag<UserEvent> timeOutTag = new OutputTag<>("timeOut", TypeInformation.of(UserEvent.class));

    public static void main(String[] args) throws Exception{
    
        //步骤1、定义数据源,给数据添加水印
        final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor<UserEvent>();

        StreamExecutionEnvironment env = StreamCommon.getStreamEnv(true, true);
        DataStream<UserEvent> dataStream = env.addSource(new RandomSource())
                .setParallelism(1) //并行度设置成1,便于观察
                .assignTimestampsAndWatermarks(extractor); //加水印
        //步骤2、定义Pattern,可以看成是一个事件链
        //我这里定义的意思是,先收到下单的消息,然后再收到支付的消息,事件链可以很长,也可以有
       //复杂的组合事件
        Pattern<UserEvent,UserEvent> pattern = Pattern.<UserEvent>begin("order")
                .where(new SimpleCondition<UserEvent>() {
                    @Override
                    public boolean filter(UserEvent event) throws Exception {
                        return "0".equals(event.getAction());
                    }
                })
                .next("pay")
                .where(new SimpleCondition<UserEvent>() {
                    @Override
                    public boolean filter(UserEvent event) throws Exception {
                        return "1".equals(event.getAction());
                    }
                });
        
       //步骤3、进行事件匹配,在匹配前,需要对流进行KeyBy分组,确保每个单元里处理的是
       //同一个用户的订单
        PatternStream<UserEvent> patternStream = CEP.pattern(
                dataStream.keyBy(UserEvent::getPin),
                pattern.within(Time.seconds(10)));
    
      //步骤4、对匹配的结果进行分流
      //这里注意到select有3个参数,第一个是超时消息的容器,这里通过旁路进行输出
      //第二个参数里定义了超时的消息如何进行处理
      //第三个参数里定义里正常匹配到规则的消息如何进行处理
        SingleOutputStreamOperator<UserEvent> result = patternStream.select(timeOutTag, new PatternTimeoutFunction<UserEvent, UserEvent>() {
            @Override
            public UserEvent timeout(Map<String, List<UserEvent>> map, long l) throws Exception {
                System.out.println("这是超时了的:" + JSON.toJSONString(map));
                return map.get("order").get(0);
            }
        }, new PatternSelectFunction<UserEvent, UserEvent>() {
            @Override
            public UserEvent select(Map<String, List<UserEvent>> map) throws Exception {
                System.out.println("这是完成的订单:" + JSON.toJSONString(map));
                return map.get("pay").get(0);//这里Map有2个KEY,就是前面定义事件链的tag
            }
        });
      
        //步骤5、从旁路拿到结果流,完成超时触达
        DataStream<UserEvent> timeoutResult = result.getSideOutput(timeOutTag);
        timeoutResult.print();
        env.execute();
    }

}

三、需要注意的点
1、必须要是EventTime
尝试了ProcessTime取不到超时的结果,只能拿到匹配到规则的结果
2、within的含义
within在每一条消息到达时,为该消息开启一个定时器,当整个时间链都匹配到结果,则终止定时器,否则被视为超时。

例如第一个条件要累计多次的时候,当满足累积多次后,会重新开始 计时来计算超时。
3、.oneOrMore().where(...)
满足条件后开始计时,再within限定的时间段内如果有满足条件的数据进来则修改计时器重新计时(类似SessionWindow)

上一篇下一篇

猜你喜欢

热点阅读