Flink_process算子
2022-08-24 本文已影响0人
Eqo
process算子有4个方法类,包括
- ProcessFunction、
- KeyedProcessFunction、
- BroadcastProcessFunction、
- KeyedBroadcastProcessFunction。其中KeyedProcessFunction及KeyedBroadcastProcessFunction只能处理keyedStream,ProcessFunction及BroadcastProcessFunction只能处理DataSteam
ProcessFunction类
.process (new ProcessFunction(){ } )
有两个方法
-
processElement方法:对流中每条数据进行处理 -
onTimer方法:对分组流内的数据使用 当设置定时器后,调用执行方法
可以实现三种功能
event
对数据流内的每条数据处理
state
处理分组流中 每个key对应的状态 键控状态
timeers
分组流中每个key的数据设置定时器
event
state
KeyedProcessFunction方法类
对分组流中的数据处理
timmers
对分组流内的数据设置定时任务
核心代码
keybe().process(new processFunction(){
processElement(){
//给分组流内的数据,设置时间,触发定时器,执行onTime
}
onTime(){
//定时任务 数据处理逻辑
}
})
// todo 判断订单状态,如果是未付款,创建定时任务,假设15s后触发执行 -> 再次依据订单id查询订单状态,如果依然时未付款,直接取消订单(修改订单状态为取消)
if("未付款".equals(orderData.getOrderStatus())){
System.out.println("订单[" + orderData.getOrderId() + "]状态为【未付款】, 设置定时任务,15秒后触发执行......");
// 创建定时任务,到达时间,触发执行onTimer方法
long time = fastDateFormat.parse(orderData.getOrderTime()).getTime() ;
ctx.timerService().registerProcessingTimeTimer(time + 15 * 1000L);
}
}
// 当定时器触发执行时,调用OnTimer方法,实现订单超时自动取消功能
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderData> out) throws Exception {
// a. 获取订单id
String orderId = ctx.getCurrentKey();
System.out.println("指定定时任务,检查订单[" + orderId + "]状态....................");
// b. 依据订单id查询Mysql数据库订单状态
String orderStatus = queryStatus(orderId);
System.out.println("查询订单[" + orderId + "]状态为:" + orderStatus + "....................");
// c. 判断状态:未付款,更新订单状态为:取消
if("未付款".equals(orderStatus)){
updateStatus(orderId);
System.out.println("订单[" + orderId + "]已超时, 更新状态为:取消....................................");
}
}
BroadcastState:将小数据流DataStream广播到各个Task中,数据存储在MapState中,以key/value对存储的。
image.png
过程:
1.广播小表数据,定义广播状态描述符
2.大表数据对广播数据进行 conect连接
// 3. 数据转换-transformation
/*
将小数据流,广播以后,存储到MapState中,方便大表数据流处理数据依据key获取value值
Map[userId, userInfo]
大表数据处理,依据userId,获取小表中对应用户信息UserInfo
map.get(userId) -> userInfo
*/
// todo: 3-1. 广播小表数据
MapStateDescriptor<String, UserInfo> descriptor = new MapStateDescriptor<>("userInfoState", String.class, UserInfo.class);
BroadcastStream<UserInfo> broadcastStream = userStream.broadcast(descriptor);
// todo: 3-2. 将大表数据与广播数据进行connect连接
SingleOutputStreamOperator<String> processStream = logStream
.connect(broadcastStream)
.process(new BroadcastProcessFunction<TrackLog, UserInfo, String>() {
// 处理大表数据流中每条数据, todo:大表数据流中每条数据到BroadcastState中获取数据
@Override
public void processElement(TrackLog value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 获取广播状态数据
ReadOnlyBroadcastState<String, UserInfo> broadcastState = ctx.getBroadcastState(descriptor);
// 获取日志数据中userId
String userId = value.getUserId();
// 依据userId到状态中获取对应的用户信息数据
UserInfo userInfo = broadcastState.get(userId);
// 关联数据
String output = userInfo + " -> " + value;
out.collect(output);
}
// 处理广播的小表数据流中数据, todo: 广播流中数据放入BroadcastState中
@Override
public void processBroadcastElement(UserInfo value, Context ctx, Collector<String> out) throws Exception {
// 获取广播状态数据,本地上map集合
BroadcastState<String, UserInfo> broadcastState = ctx.getBroadcastState(descriptor);
// 获取用户id
String userId = value.getUserId();
// 将广播流中数据存储到状态中
broadcastState.put(userId, value);
}
});