Flink官方入门项目简介

2019-02-15  本文已影响66人  叩丁狼教育

本文作者:林伟兵,叩丁狼高级讲师。原创文章,转载请注明出处。

1. Flink简介

Flink 提供了三个核心的用户API:

  1. Batch
  2. Streaming
  3. Talbe & SQL

​ 本文不介绍Flink是什么,Flink的核心组件和特性,本文从用户的角度解读Batch和Streaming代码的实现方式,本文使用Flink1.6.1版本作为讲解。首先需要下载官方的案例,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html.

2. 官方案例搭建

  1. 通过maven工具下载源码:

    $ mvn archetype:generate                               \
         -DarchetypeGroupId=org.apache.flink              \
         -DarchetypeArtifactId=flink-quickstart-java      \
         -DarchetypeVersion=1.6.1
    
  2. 也可以使用官方提供的地址下载,但是不管如何 都必须保证有maven环境:

    $ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.1
    
  3. 下载后的项目架构是这样的:

    $ tree quickstart/
    quickstart/
    ├── pom.xml
    └── src
        └── main
            ├── java
            │   └── org
            │       └── myorg
            │           └── quickstart
            │               ├── BatchJob.java
            │               └── StreamingJob.java
            └── resources
                └── log4j.properties
    
  4. 将源码导入IDE中。打开IDE,点击“open”,将项目导进去,此时需要加载对应的maven依赖。等待项目构建完毕后,删除BatchJob.java和StreamingJob.java代码,因为这个两个代码只是提供了一个模板,并没有实质性的代码。

  5. 打开github网站,找到flink项目的example代码,将batch和streaming代码粘贴到项目中。

01.png
02.png
03.png

粘贴后的代码如下,运行程序后打印出单词统计的结果:

04.png

3. Batch代码解读:

如下贴出源代码,重要的代码部分参考注释:

public class WordCount {

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        //创建执行环境,这里的环境可以是本地环境,也可以是集群环境.
        // 用户可以调用createLocalEnvironment()运行本地环境 也可以调用createRemoteEnvironment()运行集群环境
        //getExecutionEnvironment()可以动态获取环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // ParameterTool就是对参数的一个封装类,这里将传递进来的参数作为全局参数
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            //如果运行程序携带input参数,则从input参数后面获取文件地址,从而读取文件的内容
            text = env.readTextFile(params.get("input"));
        } else {
            //如果不携带参数,则从WordCountData类读取每一行数据并进行分词统计
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                // 这里将一行数据转换成 (letter,1) (lette,1)(letter,1) (...)
                text.flatMap(new Tokenizer())
                // 将tuple(letter,1) 的第0个元素 进行groupby,再对相同的letter的count进行累加。
                .groupBy(0)
                .sum(1);
        //上面程序得到的结果是一个队列,该队列由一个个的Tuple2<String, Integer>组成。

        // emit result
        if (params.has("output")) {
            //如果有output参数,则将output提供的路径作为文件夹,将结果写入到该文件夹下
            counts.writeAsCsv(params.get("output"), "\n", " ");
            //开始执行程序
            env.execute("WordCount Example");
        } else {
            //执行程序,将结果打印出来
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
            // Tuple2 就是将2个元素作为一个元组, 如果将3个元素作为一个元祖,则需要使用Tuple3
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

4. Streaming代码解读:

Streaming代码与batch代码并没有多大的不同。不同点参考注释:

public class WordCount {

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // 不同于批量处理,实时流处理的环境代码前面添加了Stream标识。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // 这里的统计采用的是keyBy而非groupBy,因为实时流数据正常的逻辑是实时产生,它是无边数据
            .keyBy(0).sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
        //开始执行程序
        env.execute("Streaming WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

总结:

  1. 不管是离线处理还是实时处理,都离不开三板斧,1-获取当前运行环境,2-通过算子进行数据分析,3-将结果输出到某一端。
  2. 算子包括了map() , flatMap(), filter(), keyBy( ), sum( ) ...
  3. 当前案例的批量处理的结果是将数据一次性输出,实时流数据是对每一行数据进行逐一的分析和输出。
  4. 除了Batch / Streaming 代码,官方还提供了 SQL 代码的案例。有兴趣的同学可以下载来试试。

想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

上一篇下一篇

猜你喜欢

热点阅读