flink架构师3-高级开发实战
2020-11-07 本文已影响0人
fat32jin
1.5 KeyedState 案例演示 0:3~ 0:29
需求:将两个流中,订单号一样的数据合并在一起输出
orderinfo1数据 topic
商品平台
123,拖把,30.0
234,牙膏,20.0
345,被子,114.4
333,杯子,112.2
444,Mac电脑,30000.0
orderinfo2数据 topic
123,2019-11-11 10:11:12,江苏
234,2019-11-11 11:11:13,云南
345,2019-11-11 12:11:14,安徽
333,2019-11-11 13:11:15,北京
444,2019-11-11 14:11:16,深圳
KeyedStream<OrderInfo1, Long> orderInfo1Stream = info1.map(line ->
string2OrderInfo1(line))
.keyBy(orderInfo1 -> orderInfo1.getOrderId());
KeyedStream<OrderInfo2, Long> orderInfo2Stream = info2.map(line ->
string2OrderInfo2(line))
.keyBy(orderInfo2 -> orderInfo2.getOrderId());
orderInfo1Stream.connect(orderInfo2Stream)
.flatMap(new EnrichmentFunction())
.print();
env.execute("OrderStream");
@Override
public void open(Configuration parameters) {
orderInfo1State = getRuntimeContext()
.getState(new ValueStateDescriptor<OrderInfo1>("info1",
OrderInfo1.class));
orderInfo2State = getRuntimeContext()
.getState(new ValueStateDescriptor<OrderInfo2>
("info2",OrderInfo2.class));
}
@Override
public void flatMap1(OrderInfo1 orderInfo1, Collector<Tuple2<OrderInfo1,
OrderInfo2>> out) throws Exception {
OrderInfo2 value2 = orderInfo2State.value();
if(value2 != null){
orderInfo2State.clear();
out.collect(Tuple2.of(orderInfo1,value2));
}else{
orderInfo1State.update(orderInfo1);
}
}
@Override
public void flatMap2(OrderInfo2 orderInfo2, Collector<Tuple2<OrderInfo1,
OrderInfo2>> out)throws Exception {
OrderInfo1 value1 = orderInfo1State.value();
if(value1 != null){
orderInfo1State.clear();
out.collect(Tuple2.of(value1,orderInfo2));
}else{
orderInfo2State.update(orderInfo2);
}
}
1.4 Operator State案例演示 0:30~ 2:25
1.5 三种state状态 0:43 ~0:58
2 .0 Checkpoint原理 0:58 ~ 1:55
2。1 自定义checkpoint 算法 0:59-1:10
2.2 自带checkpoint 算法解析 1:10 ~ 1:55
2.3 Checkpoint配置1:55~2:02
2.4 重启和恢复savepoint 2:02~ 2:25
3 .0 window原理 。2:25~ 2:46
window案例 2:25~ 2:46
时间概念 2:45 ~ 3:10