Flink_process算子

2022-08-24  本文已影响0人  Eqo

process算子有4个方法类,包括

ProcessFunction类

.process (new ProcessFunction(){ } )
有两个方法

event

state


KeyedProcessFunction方法类

对分组流中的数据处理

timmers

对分组流内的数据设置定时任务
核心代码

keybe().process(new processFunction(){
processElement(){
//给分组流内的数据,设置时间,触发定时器,执行onTime
}
onTime(){
//定时任务 数据处理逻辑
}

})


                        // todo 判断订单状态,如果是未付款,创建定时任务,假设15s后触发执行 -> 再次依据订单id查询订单状态,如果依然时未付款,直接取消订单(修改订单状态为取消)
                        if("未付款".equals(orderData.getOrderStatus())){
                            System.out.println("订单[" + orderData.getOrderId() + "]状态为【未付款】, 设置定时任务,15秒后触发执行......");
                            // 创建定时任务,到达时间,触发执行onTimer方法
                            long time = fastDateFormat.parse(orderData.getOrderTime()).getTime() ;
                            ctx.timerService().registerProcessingTimeTimer(time + 15 * 1000L);
                        }
                    }

                    // 当定时器触发执行时,调用OnTimer方法,实现订单超时自动取消功能
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderData> out) throws Exception {
                        // a. 获取订单id
                        String orderId = ctx.getCurrentKey();
                        System.out.println("指定定时任务,检查订单[" + orderId + "]状态....................");
                        // b. 依据订单id查询Mysql数据库订单状态
                        String orderStatus = queryStatus(orderId);
                        System.out.println("查询订单[" + orderId + "]状态为:" + orderStatus + "....................");
                        // c. 判断状态:未付款,更新订单状态为:取消
                        if("未付款".equals(orderStatus)){
                            updateStatus(orderId);
                            System.out.println("订单[" + orderId + "]已超时, 更新状态为:取消....................................");
                        }
                    }


BroadcastState:将小数据流DataStream广播到各个Task中,数据存储在MapState中,以key/value对存储的。


image.png

过程:
1.广播小表数据,定义广播状态描述符
2.大表数据对广播数据进行 conect连接

        // 3. 数据转换-transformation
        /*
            将小数据流,广播以后,存储到MapState中,方便大表数据流处理数据依据key获取value值
                Map[userId, userInfo]
            大表数据处理,依据userId,获取小表中对应用户信息UserInfo
                map.get(userId) -> userInfo
         */
        // todo: 3-1. 广播小表数据
        MapStateDescriptor<String, UserInfo> descriptor = new MapStateDescriptor<>("userInfoState", String.class, UserInfo.class);
        BroadcastStream<UserInfo> broadcastStream = userStream.broadcast(descriptor);

        // todo: 3-2. 将大表数据与广播数据进行connect连接
        SingleOutputStreamOperator<String> processStream = logStream
            .connect(broadcastStream)
            .process(new BroadcastProcessFunction<TrackLog, UserInfo, String>() {
                // 处理大表数据流中每条数据, todo:大表数据流中每条数据到BroadcastState中获取数据
                @Override
                public void processElement(TrackLog value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                    // 获取广播状态数据
                    ReadOnlyBroadcastState<String, UserInfo> broadcastState = ctx.getBroadcastState(descriptor);
                    // 获取日志数据中userId
                    String userId = value.getUserId();
                    // 依据userId到状态中获取对应的用户信息数据
                    UserInfo userInfo = broadcastState.get(userId);
                    // 关联数据
                    String output = userInfo +  " -> " + value;
                    out.collect(output);
                }

                // 处理广播的小表数据流中数据, todo: 广播流中数据放入BroadcastState中
                @Override
                public void processBroadcastElement(UserInfo value, Context ctx, Collector<String> out) throws Exception {
                    // 获取广播状态数据,本地上map集合
                    BroadcastState<String, UserInfo> broadcastState = ctx.getBroadcastState(descriptor);
                    // 获取用户id
                    String userId = value.getUserId();
                    // 将广播流中数据存储到状态中
                    broadcastState.put(userId, value);
                }
            });
上一篇 下一篇

猜你喜欢

热点阅读