Flink(1.13) 执行模式(Execution Mode)

2021-08-22  本文已影响0人  万事万物

前言

Flink从1.12.0上对流式API新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes).
流式API的传统执行模式我们称之为STREAMING 执行模式, 这种模式一般用于无界流, 需要持续的在线处理
1.12.0新增了一个BATCH执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据.
默认是使用的STREAMING 执行模式

选择执行模式

BATCH执行模式仅仅用于有界数据, 而STREAMING 执行模式可以用在有界数据和无界数据.
一个公用的规则就是: 当你处理的数据是有界的就应该使用BATCH执行模式, 因为它更加高效. 当你的数据是无界的, 则必须使用STREAMING 执行模式, 因为只有这种模式才能处理持续的数据流.

配置BATH执行模式

执行模式有3个选择可配:

  1. STREAMING(默认):有界数据和无界数据
  2. BATCH:有界数据
  3. AUTOMATIC:

传统的方式:

  1. 批处理:
// 获取环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

// 读取资源
DataSource<String> dataSource = executionEnvironment.readTextFile("文件地址");
  1. 有界流
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();

// 读取资源
DataStreamSource<String> streamSource = env.readTextFile("文件地址");

env.execute();
  1. 无界流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 监听端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999)

env.execute();

批处理与流处理的区别:

  1. 批处理处理数据,是一批一批对数据处理,spark就是一个微批数据处理引擎,可以理解成先对数据积压,然后达到一定量再一块处理。
  2. 流处理,有数据就处理,不需要积压数据
  3. 批处理无需保留数据状态,处理完就输出。
  4. 流处理需要保留数据状态,因为也有可能还有该数据。
  5. 批处理完成,程序就停止。
  6. 流处理,需要一直等待,即使后面不会有数据产生,程序依然保存运行状态。

有界与无界的理解:

有界流与无界流的区别在于读取的数据是否有尽头,若读取的数据类似于文件(知道开始的位置,结束的位置),无界流就是知道开始但不知道什么时候结束,如网络,Kafka,需要不同的监听着,等待处理数据。

案例(wordcount)

流式处理

程序比较简单,就没加注释

    @Test
    public void wordCount1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");

        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = flatMap.keyBy(e -> e.f0).sum(1);

        result.print();

        env.execute();
    }

结果

5> (python,1)
12> (word,1)
3> (java,1)
13> (xml,1)
1> (pon,1)
11> (log,1)
7> (txt,1)
1> (pon,2)
11> (exe,1)
3> (java,2)
11> (log,2)
5> (python,2)
5> (hello,1)
5> (python,3)
5> (hello,2)
3> (java,3)
13> (xml,2)
14> (count,1)
11> (log,3)
13> (xml,3)
14> (batch,1)

批处理

    @Test
    public void wordCount2() throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 和流式处理,是两套完全不同的api
        DataSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");

        FlatMapOperator<String, Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));

        AggregateOperator<Tuple2<String, Integer>> result = flatMap.groupBy(0).sum(1);

        result.print();
    }

结果

(pon,2)
(hello,2)
(log,3)
(xml,3)
(exe,1)
(java,3)
(python,3)
(txt,1)
(batch,1)
(count,1)
(word,1)

设置执行模式

传统上的批处理和流处理,需要两套不同的API来处理,不太符合Flink中流批一体的理念,此时执行模式的出现完美的解决了问题。只需要指定一个执行模式,就可以完成流与批之间的相互转换,其他代码都不用修改。

执行模式所支持的模式:

@PublicEvolving
public enum RuntimeExecutionMode {

    /**
     * The Pipeline will be executed with Streaming Semantics. All tasks will be deployed before
     * execution starts, checkpoints will be enabled, and both processing and event time will be
     * fully supported.
     */
    STREAMING,

    /**
     * The Pipeline will be executed with Batch Semantics. Tasks will be scheduled gradually based
     * on the scheduling region they belong, shuffles between regions will be blocking, watermarks
     * are assumed to be "perfect" i.e. no late data, and processing time is assumed to not advance
     * during execution.
     */
    BATCH,

    /**
     * Flink will set the execution mode to {@link RuntimeExecutionMode#BATCH} if all sources are
     * bounded, or {@link RuntimeExecutionMode#STREAMING} if there is at least one source which is
     * unbounded.
     */
    AUTOMATIC
}

转换成批处理

    @Test
    public void wordCount1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 转成批处理,其他都不用改
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStreamSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");

        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = flatMap.keyBy(e -> e.f0).sum(1);

        result.print();

        env.execute();
    }

结果

1> (pon,2)
5> (hello,2)
5> (python,3)
3> (java,3)
7> (txt,1)
14> (batch,1)
14> (count,1)
13> (xml,3)
11> (exe,1)
11> (log,3)
12> (word,1)

注意:

  1. 在13版本之前不要使用执行模式,若数据只有一个(如: (txt,1)),那么该数据不会被输出,13版本修复了该问题。
  2. 批处理不会存状态(处理完就直接输出了,所以没有必要保留状态)
上一篇 下一篇

猜你喜欢

热点阅读