Flink Operators 实战高级篇
2019-11-12 本文已影响0人
Woople
本文将介绍Window Join,Window CoGroup和Window Interval Join的基本用法。
DataStream Transformations Window
window算子在flink中是非常重要的,要理解window算子首先要明白window的相关机制和原理。本文将从实战的角度讲解api的使用,详细的原理机制建议先阅读官方文档Windows。下面以Tumbling Windows为例讲解一些常见用法。下面基于ProcessingTime的样例都适用于EventTime。
基于ProcessingTime的基本用法
Window Join
定义
Transformation | Description |
---|---|
DataStream,DataStream → DataStream | Join two data streams on a given key and a common window. |
说明
将两个window的数据进行join
样例
代码
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource());
DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(orangeStream, greenStream, 5);
joinedStream.print("join");
env.execute("Windowed Join Demo");
}
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize) {
return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> join(
Tuple2<String, Integer> first,
Tuple2<String, Integer> second) {
return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
}
});
}
private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) {
return value.f0;
}
}
private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
int bound = 50;
String[] keys = new String[]{"foo", "bar", "baz"};
final long numElements = RandomUtils.nextLong(10, 20);
int i = 0;
while (running && i < numElements) {
Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));
ctx.collect(data);
System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
i++;
}
}
@Override
public void cancel() {
running = false;
}
}
}
输出结果
59-sand data:(bar,49)
58-sand data:(bar,44)
58-sand data:(foo,2)
59-sand data:(baz,34)
58-sand data:(baz,2)
59-sand data:(baz,29)
join> (baz,34,2)
join> (baz,29,2)
说明
两条流里面的数据类型都是Tuple2,随机生成一些数据,窗口大小设置为5秒,根据两个流数据中的key进行join
扫描下方二维码进入语雀查看完整文章
钉钉、微信扫码