flink入门大数据Flink

flink学习之八-keyby&reduce

2019-03-17  本文已影响250人  AlanKim

上文学习了简单的map、flatmap、filter,在这里开始继续看keyBy及reduce

keyBy

先看定义,通过keyBy,DataStream→KeyedStream。

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。

此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple(数组)
    

注意 如果出现以下情况,则类型不能成为关键

  1. 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
  2. 它是任何类型的数组。

看段代码:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

运行后,结果如下:

3> key:1,value:5
3> key:1,value:7
3> key:1,value:2
4> key:2,value:3
4> key:2,value:4

可以看到,前面的 3> 和 4> 输出 本身是个分组,而且顺序是从先输出key=1的tuple数组,再输出key=2的数组。

也就是说,keyby类似于sql中的group by,将数据进行了分组。后面基于keyedSteam的操作,都是组内操作。

断点看了下keyedStream的结构:

keyedStream.png

可以看到,包含了keyType、keySelector,以及转换后的PartitionTransformation,也就是已经做了分区了。后续的所有操作都是按照分区内数据来处理的。

reduce

reduce表示将数据合并成一个新的数据,返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。而且reduce方法不能直接应用于SingleOutputStreamOperator对象,也好理解,因为这个对象是个无限的流,对无限的数据做合并,没有任何意义哈!

所以reduce需要针对分组或者一个window(窗口)来执行,也就是分别对应于keyBy、window/timeWindow 处理后的数据,根据ReduceFunction将元素与上一个reduce后的结果合并,产出合并之后的结果。

在上面代码的基础上修改:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .reduce((ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0, t2.f1 + t1.f1)) // value做累加
                .print();

        env.execute("execute");
    }
}
3> (1,5)
3> (1,12)
3> (1,14)
4> (2,3)
4> (2,7)

可以看到,分组后,每次有一个数组进来,都会产生新的数据,依然是按照分组来输出的。

如果改下reduce中的实现:

ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0 + t2.f0, t2.f1 + t1.f1)

那么输出就是:

2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214).
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) [FINISHED]
4> (2,3)
4> (4,7)

...

2019-01-22 12:04:56.118 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.122 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (1/4) (0fdc49eb18050efa3acec361978f3e93) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.125 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (4/4) (1607b502ab2791f2f567c61da214bd82) switched from RUNNING to FINISHED.
3> (1,5)
3> (2,12)
3> (3,14)

可以看到输出结果,一方面是是key-reduce的状态,从RUNNING迁移到FINISHED;另一方面是按组输出了最终的reduce值。

聚合

KeyedStream→DataStream

在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy类似)。

---TODO 这里存疑,因为返回的数据始终是数据源,难道是我写错了什么?SingleOutputStreamOperator<Tuple2>改成SingleOutputStreamOperator<Long> 也是一样的结果,等待后续继续验证。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

继续在上面代码的基础上做实验:

sum
public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        KeyedStream keyedStream =  env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                ;

        SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.sum(0);
        sumStream.addSink(new PrintSinkFunction<>());

        env.execute("execute");
    }

对第一个元素(位置0)做sum,结果如下:

3> (1,5)
3> (2,5)
3> (3,5)
...
4> (2,3)
2019-01-22 21:27:07.401 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (f3368fedb9805b1e59f4443252a2fb2b) switched from RUNNING to FINISHED.
4> (4,3)

可以看到,对第一个数据(也就是key)做了累加,然后value以第一个进来的数据为准。

如过改成keyedStream.sum(1); 也就是针对第二个元素求和,得到的结果如下:

4> (2,3)
4> (2,7)
...
3> (1,5)
3> (1,12)
2019-01-23 10:50:47.498 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (df09751c6722a5942b058a1300ae9fb3) switched from RUNNING to FINISHED.
3> (1,14)
min
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.min(1);

得到的输出结果是:

3> (1,5)  -- 第一组 第一个数据到的结果
3> (1,5)  -- 第一组 第二个数据到的结果
4> (2,3)  -- 第二组 第一个数据到的结果
4> (2,3)  -- 第二组 第二个数据到的结果
3> (1,2)  -- 第一组 第三个数据到的结果

这里顺序有点乱,不过没问题,数据按照顺序一个一个的过来,然后计算当前数据过来时有最小value的数据。

minBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.minBy(1);
3> (1,5)
3> (1,5)
4> (2,3)
3> (1,2)
4> (2,3)

类似的,只是组间打印的顺序有区别而已。

max
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.max(1);
3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

按照顺序,取最大的数据

maxBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.maxBy(1);

3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

有一点要牢记,数据是一直流过来的,这些聚合方法都是在每次收到新的数据之后,重新计算/比较得出来的结果,而不是只有一个最终结果。

上一篇下一篇

猜你喜欢

热点阅读