Flink(1.13) Transform

2021-08-21  本文已影响0人  万事万物
image.png

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.

map

    @Test
    public void map() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        SingleOutputStreamOperator<Tuple2<String, Integer>> returns = source.map((MapFunction<String, Tuple2<String, Integer>>)
                value -> Tuple2.of(value, value.length()))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        returns.print("map>>>");

        env.execute();

    }
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
hello
map>>>:1> (hello,5)

flatMap

    @Test
    public void flatMap() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> dataSource = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");


        FlatMapOperator<String, String> returns = dataSource
                .flatMap((FlatMapFunction<String, String>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(out::collect))
                .returns(Types.STRING);

        returns.print();

    }
java python hello
pon xml log batch
python log java word
count xml python hello
exe txt log xml pon java
java
python
hello
python
log
java
word
exe
txt
log
xml
pon
java
count
xml
python
hello
pon
xml
log
batch

filter

    @Test
    public void flatMap() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> dataSource = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");


        // 扁平化
        FlatMapOperator<String, String> flatMap = dataSource
                .flatMap((FlatMapFunction<String, String>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(out::collect))
                .returns(Types.STRING);

        // 过滤
        FilterOperator<String> filter = flatMap.filter(s -> !"log".equals(s));


        filter.print();

    }
pon
xml
batch
java
python
hello
count
xml
python
hello
python
java
word
exe
txt
xml
pon
java

keyBy

  1. 没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
  2. 任何类型的数组
    @Test
    public void group() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 监听 9999 端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(s -> out.collect(Tuple2.of(s, 1))))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 分组
        KeyedStream<Tuple2<String, Integer>, Object> keyBy = flatMap.keyBy((KeySelector<Tuple2<String, Integer>, Object>) value -> value.f0);


        // 统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);


        sum.print();

        env.execute();

    }

[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
java python java java python hello
java python java hello
3> (java,1)
5> (python,1)
3> (java,2)
5> (python,2)
3> (java,3)
5> (hello,1)
5> (python,3)
3> (java,4)
5> (hello,2)
3> (java,5)

shuffle

  @Test
    public void shuffle() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听 9999 端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        source.print("print>>>");

        source.shuffle().print("shuffle>>>");

        env.execute();
    }
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
java
python
scala
spark
shuffle>>>:11> java
print>>>:5> java // 从11 分配到 5
print>>>:6> python 
shuffle>>>:6> python   // 从6 分配到 6
print>>>:7> scala
shuffle>>>:5> scala  // 从7 分配到 5
print>>>:8> spark
shuffle>>>:4> spark  // 从8 分配到 4
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }
}

numberOfChannels:当前环境的最大并发度,若不设置默认并发度为cpu核数。
若并发度为10,那么每次分配策略则是1~10之间的数。

connect

  1. 两个流中存储的数据类型可以不同
  2. 只是机械的合并在一起, 内部仍然是分离的2个流
  3. 只能2个流进行connect, 不能有第3个参与

把两个流连接在一起: 貌合神离,输入类型可以不同,但是返回的类型必须是一致。

    @Test
    public void connect() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 字符串
        DataStreamSource<String> strSource = env.fromElements("a", "b", "c", "d");

        // 数字
        DataStreamSource<Integer> numSource = env.fromElements(1,2,3,4,5,6,7,8,9);

        // 连接
        ConnectedStreams<String, Integer> connect = strSource.connect(numSource);


        /**
         * 两个
         */
        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<String, Integer, String>() {
            @Override
            public String map1(String value) {
                return value;
            }

            @Override
            public String map2(Integer value) {
                return value.toString();
            }
        });


        result.print();

        env.execute();

    }
11> a
10> 4
14> d
13> c
12> b
8> 2
15> 9
9> 3
12> 6
13> 7
14> 8
11> 5
7> 1

connect 也有自己所对应的算子,都会一Co开头,是connect的缩写。

每个Stream都有自己独立处理逻辑(map1和map2),这也说明了为啥只能2个流进行connect, 不能有第3个参与的原因。

@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    /**
     * This method is called for each element in the first of the connected streams.
     *
     * @param value The stream element
     * @return The resulting element
     * @throws Exception The function may throw exceptions which cause the streaming program to fail
     *     and go into recovery.
     */
    OUT map1(IN1 value) throws Exception;

    /**
     * This method is called for each element in the second of the connected streams.
     *
     * @param value The stream element
     * @return The resulting element
     * @throws Exception The function may throw exceptions which cause the streaming program to fail
     *     and go into recovery.
     */
    OUT map2(IN2 value) throws Exception;
}

虽然输入参数不同,但是返回类型(OUT)是一致的。

 CoMapFunction<IN1, IN2, OUT> 

union

  1. 参数类型必须一致
  2. 可以连续处理多个流
@Test
    public void union() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 数字
        DataStreamSource<Integer> numSource1 = env.fromElements(1,2,3,4,5,6,7,8,9);
        DataStreamSource<Integer> numSource2 = env.fromElements(111,22,33,4,4,5,22);
        DataStreamSource<Integer> numSource3 = env.fromElements(2,313,43,14,1);
        DataStreamSource<Integer> numSource4 = env.fromElements(4,14,314,31,4);

        // 合并
        DataStream<Integer> union = numSource1.union(numSource1).union(numSource2).union(numSource3).union(numSource4);

        union.print();

        env.execute();
    }
2> 5
13> 111
2> 3
15> 33
5> 4
12> 14
15> 4
1> 4
9> 14
3> 3
12> 8
8> 43
11> 4
7> 313
9> 6
6> 5
13> 314
10> 1
16> 4
14> 22
10> 7
4> 4
14> 31
6> 2
13> 8
7> 5
11> 7
3> 22
8> 6
15> 1
15> 9
1> 2
14> 1
16> 2
14> 9
  1. union之前两个或多个流的类型必须是一样,connect可以不一样
  2. connect只能操作两个流,union可以操作多个。

简单滚动聚合算子

    @Test
    public void sum3() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<UserBean> map = source.map((MapFunction<String, UserBean>) value
                -> new UserBean(value.split(",")))
                .returns(Types.POJO(UserBean.class));

        KeyedStream<UserBean, Integer> keyBy = map.keyBy(e -> e.getId());

        SingleOutputStreamOperator<UserBean> sum = keyBy.sum("age");

        sum.print();

        env.execute();
    }

1:输入

8,zhangsan111,19,F

1:输出

2> UserBean(id=8, name=zhangsan111, age=19, sex=F)

2:输入

8,zhangsan111,1,F

2:输出

2> UserBean(id=8, name=zhangsan111, age=20, sex=F)

3:输入,修改性别M

8,zhangsan111,19,M

3:输出,除了年龄,其他都没有改变

2> UserBean(id=8, name=zhangsan111, age=39, sex=F)

4:输入,更改id

7,zhangfeilong,18,M

4:输出

15> UserBean(id=7, name=zhangfeilong, age=18, sex=M)

5:输入,更改id

7,zhangfeilong,10,F

5:输出,id为7,更改性别

15> UserBean(id=7, name=zhangfeilong, age=28, sex=M)

6:输入,id为8

8,wangfff,10,F

6:输出(统计上次id为8的数据结果)

2> UserBean(id=8, name=zhangsan111, age=49, sex=F)

总体输出情况

2> UserBean(id=8, name=zhangsan111, age=19, sex=F)
2> UserBean(id=8, name=zhangsan111, age=20, sex=F)
2> UserBean(id=8, name=zhangsan111, age=39, sex=F)
15> UserBean(id=7, name=zhangfeilong, age=18, sex=M)
15> UserBean(id=7, name=zhangfeilong, age=28, sex=M)
  1. 组与组之间相互隔离,状态不会被影响。
  2. 所有的数据除了聚合字段(keyBy.sum("age")),一旦状态确认将不会改变(第一次是什么之后的就是什么,即使发送修改,但是状态不会改变)。
  3. min、max 和 sum 同样如此,也只会更改聚合字段的状态。

min 和max 的作用就是去状态的最大值或最小值,使用方式和sum的方式一样,作用也一样只会更改聚合字段的状态。若要更改除聚合字段以外的字段状态,那么需要加上By,但是sum并没有SumBy

minBy案例演示

  @Test
    public void sum3() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<UserBean> map = source.map((MapFunction<String, UserBean>) value
                -> new UserBean(value.split(",")))
                .returns(Types.POJO(UserBean.class));

        KeyedStream<UserBean, Integer> keyBy = map.keyBy(e -> e.getId());

        SingleOutputStreamOperator<UserBean> sum = keyBy.minBy("age");

        sum.print();

        env.execute();
    }

1: 输入

10,lifeng,10,F

1: 输出

10,lifeng,10,F

2: 输入

10,lifeng,10,M

2: 输出,不比上一个状态小,所以不更改状态

10,lifeng,10,F

3: 输入

10,李菲菲,8,F

1: 输出,比上一个状态小,更改

9> UserBean(id=10, name=李菲菲, age=8, sex=F)
9> UserBean(id=10, name=lifeng, age=10, sex=F)
9> UserBean(id=10, name=lifeng, age=10, sex=F)
9> UserBean(id=10, name=李菲菲, age=8, sex=F)

不分组的聚合

   @Test
    public void sum1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 34, 431, 3, 234, 311);

        SingleOutputStreamOperator<Tuple2<String, Integer>> a = source.map(
                (MapFunction<Integer, Tuple2<String, Integer>>) value
                        -> Tuple2.of("a", value))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyBy = a.keyBy(
                (KeySelector<Tuple2<String, Integer>, String>) value -> value.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);

        sum.print();
        env.execute();
    }
    @Test
    public void sum2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);

        SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
            int count = 0;

            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                count += value;
                out.collect(count);
            }
        }).setParallelism(1);

        process.print().setParallelism(1);

        env.execute();
    }

reduce

    @Test
    public void reduce() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);

        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map((MapFunction<Integer, Tuple2<String, Integer>>)
                value -> Tuple2.of("a", value)).returns(Types.TUPLE(Types.STRING,Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = map.keyBy(s -> s.f0).reduce((ReduceFunction<Tuple2<String, Integer>>) (v1, v2) -> Tuple2.of("a", v1.f1 + v2.f1));

        reduce.print().setParallelism(1);

        env.execute();

    }
(a,5)
(a,12)
(a,21)
(a,24)
(a,32)
(a,38)
(a,40)
(a,44)
(a,45)
上一篇 下一篇

猜你喜欢

热点阅读