Flink入门-构建第一个Flink应用

2020-07-09  本文已影响0人  zfylin

开发环境准备

创建Maven项目

使用maven模版创建项目

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=com.zflylin.demo \
    -DartifactId=flink-demo \
    -Dversion=0.0.1 \
    -Dpackage=com.zflylin.demo \
    -DinteractiveMode=false

关于mvn archetype:generate的相关参数,含义如下:

项目相关参数:

参数 含义
groupId 当前应用程序隶属的Group的ID
artifactId 当前应用程序的ID
package 代码生成时使用的根包的名字,如果没有给出,默认使用archetypeGroupId

原型有关参数:

参数 含义
archetypeGroupId 原型(archetype)的Group ID
archetypeArtifactId 原型(archetype)ID
archetypeVersion 原型(archetype)版本
archetypeRepository 包含原型(archetype)的资源库
archetypeCatalog archetype分类,这里按位置分类有: ‘local’ 本地,通常是本地仓库的archetype-catalog.xml文件 ‘remote’ 远程,是maven的中央仓库 file://…’ 直接指定本地文件位置archetype-catalog.xml http://…’ or ‘https://…’ 网络上的文件位置 archetype-catalog.xml ‘internal’ 默认值是remote,local
archetypeVersion 原型(archetype)版本

代码目录如下:

➜  work tree flink-demo      
flink-demo
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── zflylin
        │           └── demo
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

编写 Flink 程序

官方Demo

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // 创建流运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 定义数据源,这里是一个本地的9999端口的sock数据源
        // 对数据源做分组、开窗、聚合操作
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // 打印数据
        dataStream.print();

        // 运行
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

在9999端口上启动netcat,准备输入

➜  work nc -lk 9999

启动Flink流计算程序, 然后输入单词。

这些输入将作为示例程序的输入。如果要使得某个单词的计数结果大于1,请在5秒钟内重复输入相同的单词(如果5秒钟输入相同单词对你来说太快,请把示例程序中的窗口大小从5秒调大。

➜  work nc -lk 9999
aa bb cc aa 
dd cc

输出结果

....
22:08:55,928 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing heap keyed state backend with stream factory.
5> (aa,2)
8> (cc,2)
5> (bb,1)
8> (dd,1)
上一篇 下一篇

猜你喜欢

热点阅读