数客联盟

Flink Operators 实战高级篇

2019-11-12  本文已影响0人  Woople

返回

本文将介绍Window Join,Window CoGroup和Window Interval Join的基本用法。

DataStream Transformations Window

window算子在flink中是非常重要的,要理解window算子首先要明白window的相关机制和原理。本文将从实战的角度讲解api的使用,详细的原理机制建议先阅读官方文档Windows。下面以Tumbling Windows为例讲解一些常见用法。下面基于ProcessingTime的样例都适用于EventTime。

基于ProcessingTime的基本用法

Window Join

定义
Transformation Description
DataStream,DataStream → DataStream Join two data streams on a given key and a common window.
说明

将两个window的数据进行join

样例
代码
public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());
        DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource());
        DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(orangeStream, greenStream, 5);
        joinedStream.print("join");
        env.execute("Windowed Join Demo");
    }

    public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
            DataStream<Tuple2<String, Integer>> grades,
            DataStream<Tuple2<String, Integer>> salaries,
            long windowSize) {

        return grades.join(salaries)
                .where(new NameKeySelector())
                .equalTo(new NameKeySelector())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {

                    @Override
                    public Tuple3<String, Integer, Integer> join(
                            Tuple2<String, Integer> first,
                            Tuple2<String, Integer> second) {
                        return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
                    }
                });
    }

    private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
        @Override
        public String getKey(Tuple2<String, Integer> value) {
            return value.f0;
        }
    }

    private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            int bound = 50;
            String[] keys = new String[]{"foo", "bar", "baz"};

            final long numElements = RandomUtils.nextLong(10, 20);
            int i = 0;
            while (running && i < numElements) {
                Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
                Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));
                ctx.collect(data);
                System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
                i++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}
输出结果
59-sand data:(bar,49)
58-sand data:(bar,44)
58-sand data:(foo,2)
59-sand data:(baz,34)
58-sand data:(baz,2)
59-sand data:(baz,29)
join> (baz,34,2)
join> (baz,29,2)
说明

两条流里面的数据类型都是Tuple2,随机生成一些数据,窗口大小设置为5秒,根据两个流数据中的key进行join

扫描下方二维码进入语雀查看完整文章

钉钉、微信扫码
上一篇下一篇

猜你喜欢

热点阅读