flink 学习笔记 — 编程模型

2019-11-22  本文已影响0人  飞不高的老鸟

flink 抽象分层结构

    flink 作为流式处理框架,不仅具有高效的流数据和批数据处理性能,具有针对开发人员使用的高效的底层API,同时有方便分析人员使用的 table/sql API。下图中可见,越上层的 API 使用越简单,但是本身的表达能力也越弱,越下层的 API 开发和学习成本越高,表达能力也越强。因此,flink API 可以适合不同层次的使用人员快速上手。

flink分层架构图.png

DataStream API 介绍

数据流执行概况

    flink 中的流式数据是无边界(unbounded)的,在执行过程中,flink 程序本身主要是由数据流和算子共同组成。在整个过程中包括 source、transform、sink 三个部分。

    由上图可以发现,一个完整的 flink 程序,必须包括 source、transform、sink 三部分,并且在最后开启 execute 才能保证程序的正常执行。通常,transform 的一个算子和数据是一一对应关系,但是有时会把一些算子链在一起(chain在一起),从而提高任务执行的性能。

数据流并行计算

    我们知道,flink 程序是一个分布式的并行计算框架,因此,在 flink 程序执行的过程中,是并行计算的。这里引用 flink 官网上描述并行计算的流程图:

flink-stream并行计算流程图.jpg

    下面我们采用一个简单的单词统计的例子进行说明:

package test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class NcWordCount {
    public static void main(String []args) throws Exception {

        // 初始化执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置计算并行度
        env.setParallelism(1);

        // 连接 source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9000);

        // 转换操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> r = lines.flatMap(new TokenZer())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .sum(1);

        // sink
        // r.writeAsText("out");
        r.print();
        env.execute("test");
    }

    static class TokenZer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] words = value.split(" ");

            for (String word:words) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
nc -l 9000
flink-stream计算并行度.jpg

DataSet API 介绍

     在了解了 DataStream API 的编程模式后,对 DataSet 能更快的上手。事实上,与 spark 相反,flink 流式计算中,以流式数据为主,批量数据处理是建立在流式处理的基础上。在编程过程中,DataSet API 的使用跟 DataStream API 的使用有着及其相似的流程。

package test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String []args) throws Exception {
        // 初始化环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 数据源
        DataSet<String> dataSet = env.fromElements("hello word spark java scala");

        // 做一次聚合操作
        AggregateOperator<Tuple2<String, Integer>> sum = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String values, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] value = values.split(" ");
                for (String val : value) {
                    out.collect(new Tuple2<>(val, 1));
                }
            }
        }).groupBy(0)
                .sum(1);

//        count.print().setParallelism(3);
        sum.print();

//        env.execute("flink-test");

    }
}

总结

     本文主要对 flink 的编程模型(DataStream/DataSet API)进行了简单的介绍,当然,主要还是处于 API 的使用阶段,在后续的 flink 使用中会通过源码分析进行更详细的介绍。同时,在 Stream API 中,我们看到了 window 窗口的影子,窗口的使用是 flink 引擎中重要的一环,接下来也会有更详尽的描述。

上一篇 下一篇

猜你喜欢

热点阅读