Hadoop系Hadoopflink

Flink应用开发

2019-03-07  本文已影响4人  零度沸腾_yjz

项目构建

项目模板

Flink应用项目可以使用Maven或SBT来构建项目,Flink针对这些构建工具提供了相应项目模板。
Maven模板命令如下,我们只需要根据提示输入应用项目的groupId、artifactId、version和package路径即可。

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2

目录结构和我们使用IDEA创建的目录结构基本一样,只是它会帮我们引入Flink依赖和日志依赖。

<flink.version>1.7.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
...
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

flink-java和flink-streaming-java_2.11是我们使用Java开发Flink应用程序的必要依赖。

默认也帮我们引入maven-shade-plugin插件,所以在打包的时候记得将mainClass改成自己的主类。

SBT模板可以使用以下命令获取:

sbt new tillrohrmann/flink-project.g8

SBT版本需要大于等于0.13.13版本。

应用程序依赖

Flink应用程序开发依赖项可以分为两类:

//Flink核心依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.7.2</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.7.2</version>
  <scope>provided</scope>
</dependency>
//应用程序依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

Flink 开发

Flink应用程序一般先是通过数据源创建分布式数据集合,比如读取文件、Kafka或本地缓存。然后对分布式集合进行各种转换操作,比如过滤、聚合、映射、修改状态、定义窗口等。最后通过接收器(sink)返回结果,结果可以写入文件(分布式)、DB或标准化输出。

根据数据源的类型,也就是有界数据源(bounded)或无界数据源(unbounded),我们可以编写批处理程序(batch)和流处理程序(streaming)。Flink对于批处理程序和流处理程序提供了不同的API,其中DataSet API用于批处理,DataStream API用于流处理。尽管提供的API不同,但是Flink底层数据处理方式是一致的。

在Flink程序中DataSet和DataSteam用来表示程序中的数据集合,这些数据集是不可变的(immutable)。DataSet代表有限的数据集,而DataSteam代表无限的数据集合。

编写Flink应用程序

编写Flink应用程序基本可以分为以下5个步骤:

  1. 获取应用程序的执行环境(execution environment)。
  2. 加载/创建初始数据集合。
  3. 对数据集执行转换操作(transformation)。
  4. 指定计算结果输出。
  5. 触发程序执行。

DataSet API在org.apache.flink.api.java 包中;DataStream API在org.apache.flink.streaming.api包中。

批处理和流处理的程序步骤是一致的,下面给出流处理作业的编写步骤。

获取应用程序执行环境

获取执行环境是Flink程序的基础。对于流处理程序的执行环境为StreamExecutionEnvironment,我们可以通过StreamExecutionEnvironment静态方法获取。对于批处理作业执行环境为ExecutionEnvironment,同样需要使用ExecutionEnvironment获取。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port,jarFile);

一般我们只需要通过getExecutionEnvironment()方法获取执行环境即可,因为他会根据当前环境自动创建合适的执行环境,比如我们在本地IDE执行程序,它将创建一个本地执行环境(local environment),该环境将在本地机器执行;如果我们将应用程序打成jar包交给Flink集群执行,getExecutionEnvironment()将返回一个集群执行环境。createLocalEnvironment()会创建一个本地执行环境,createRemoteEnvironment()会创建一个远程执行环境。
同理,批处理执行环境ExecutionEnvironment以同样方式创建执行环境。

加载/创建初始数据集合

加载/创建初始数据集合,一般主要是读取分布式文件、读取Kafka队列等。

//通过执行环境提供的方法读取外部数据集
DataStreamSource<String> textFile =  env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");

读取数据集之后会返回一个DataStream(DataStreamSource是DataStream子类,用于获取数据源数据集的),我们之后的转换、输出存储操作都可以通过DataStream提供的API进行操作了。

执行转换操作

我们能够通过DataSteam提供的API进行各种转换(transformation)操作。转换函数我们可以通过三种方式实现:实现接口、匿名类和Lambda表达式。

//实现接口
class MyMap implements MapFunction<String,String> {
            @Override
            public String map(String s) throws Exception {
                return s.toUpperCase();
            }
}
data.map(new MyMap());
//通过匿名类方式
textFile.map(new MapFunction<String, String[]>() {
            @Override
            public String[] map(String s) throws Exception {
                return s.split(" ");
            }
        });
//使用Lambda表达式
DataStream<String> filterLine  = textFile.filter(line -> line.contains("flink"));

转换操作除了提供了基础接口(比如MapFunction),还提供了丰富函数(Rich Function)。Rich Function除了提供用户定义函数,还提供了其它四个方法:open、close、getRuntimeContext和setRuntimeContext。在一些场景,这些方法都是很有用的。

指定计算结果输出

我们可以将计算结果打印出来,也可以直接将结果写入到文件中。

filterLine.print();     
filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");

触发程序执行

Flink应用程序是懒执行的(lazy execution)。也就是说当程序main方法被执行时,数据的加载和转换并不会立即触发,而是会将每一步操作添加到执行计划中,当执行环境通过execute()方法显示触发时,才会进行具体的执行操作。
execute()方法会返回一个JobExecutionResult,它包含执行时间以及累加器(accumulator)结果。

//触发程序执行
JobExecutionResult result = env.execute();

通过懒执行评估(lazy evaluation)机制,我们可以构建复杂的数据处理程序,Flink会将整个执行计划作为一个执行单元来一起执行。

Demo

下面是根据上面编写程序步骤给出的完整Demo:

public class DemoStreamingJob {
    public static void main(String[] args) throws Exception {
        //Streaming process

        //step1 获取/创建执行环境
        //自动选择正确的执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 加载/创建初始数据
        DataStreamSource<String> textFile =  env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");
        //step3  对数据源数据进行转换操作
        DataStream<String> filterLine  = textFile.filter(line -> line.contains("flink"));

        //step4 指定计算结果输出位置
 filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");

        //step5 最后触发程序执行
        JobExecutionResult result = env.execute();
    }
}

指定key操作

Flink数据模型并不是基于键值对的,所以我们不需要将数据放到键值对中再传递给Flink。但是有时许多转换操作需要基于key值进行操作,比如join、groupBy、Reduce、Aggregator等等。
Flink针对上面这些问题,提出了“虚拟”key的概念。也就是说在传递过来的数据上,通过指定具体数据项为这个消息的key。
比如我们传递进来的数据是一个tuple元组:

("yjz",27,10234),
("yjz",27,21456)
("ls",28,12345)

我们可以指定元组中的第一个元素为key。

DataStream<Tuple3<String,Integer,Long>> streamInput = ...;
DataStream<...> windowed =  streamInput.keyBy(0).window(...);

如果是DataSet可以通过groupBy来指定key:

DataSet<Tuple3<String,Integer,Long>> inputDataSetTuple = ...;
DataSet<...> reduced =  inputDataSetTuple.groupBy(0).reduce(...);

上面只是针对tuple类型数据通过位置来简单的指定key,下面我们看一下Flink都支持了哪些指定key的方式。

定义Tuple类型中的key

上面我们已经讲了可以通过指定position来指定虚拟key,我们还可以使用的的更复杂一些。比如有些场景需要使用组合多个field的方式来指定key:

KeyedStream<Tuple3<String,Integer,Long>> =  streamInput.keyBy(0,1);

上面使用元组中的第一个字段和第二个字段组合成一个key来使用。

使用字段表达式(Key Expression)指定key

使用字段表达式能够更加灵活的指定key,它可以用来指定POJO对象、Tuple元组中的key。
对于POJO对象可以通过a.b的形式来指定key。比如有以下POJO对象:

class Wc{
    public User user;
    public int count; 
}
class User{
    public String name;
    public Tuple2<String,Integer> tuple;
    public int age;
}

可以使用User作为key:words.keyBy("user")。
也可以使用User中的name作为key:words.keyBy("user.name")。

对于元组类型,我们即可以直接使用下标(从0开始),也可以使用"fx"来代表,比如第一个元素则用"f0"表示(感觉和直接使用下标没有区别)。当然可以和POJO对象组合使用:words.keyBy("user.tuple.f0")。

使用key选择器

使用key选择器能够以单个元素输入,并可以返回任意类型的元素key。
下面是返回字符串类型的key:

KeyStream<String> keyed = textFile.keyBy(new KeySelector<User,String>() {
            @Override
            public String getKey(User user) throws Exception {
                return user.name;
            }
        });

Flink支持的数据类型

为了确保系统能够以确切有效的方式针对不同类型执行不同的策略,Flink提供了以下六种不同类别的数据类型。

  1. Java Tuple和Scala class类型。
  2. POJO对象类型。
  3. 原始数据类型(Primitive Type)。
  4. 常规类型(General Class Types)
  5. Values类型(自定义序列化类型)
  6. Hadoop Writable

Java Tuple和Scala class

元组类型是包含固定数量,具有各种类型字段的复合类型。Java API提供了Tuple1~Tuple25的元组类型,后面的数字代表元组中的元素个数,所以我们可以看出最多支持25个元素的元组。但是我们可以通过嵌套来存储更多的元素数据,每个字段都可以是Flink的任意类型数据(包括元组)。元组通过tuple.f0或tuple.getField(int position)来获取字段数据,索引从0开始。

DataStreamSource<Tuple2<String,Integer>> wordCounts = env.fromElements(new Tuple2<String,Integer>("hello",2),new Tuple2<String,Integer>("word",5));
DataStream<Integer> counts = wordCounts.map(word -> word.f1);
        wordCounts.keyBy(0);

Scala中的Case class可以代替Java中的Tuple类型。

POJO对象类型

如果Java或Scala类满足以下情况,则可以作为POJO对象类型被Flink处理。

  1. 类必须是public访问类型。
  2. 必须有一个无参的公共构造函数。
  3. 所有字段字段都有public类型,或者有对应的setXxx()和getXxx()方法。
  4. 类中的字段类型必须是Flink所支持的数据类型。

自己一直理解Scala中的Case Class就是Java中的POJO,但是官方文档将Java Tuple和Scala Case Class放在一起,估计是以使用方式来进行划分的。
Flink中的POJO是使用Avro序列化框架进行序列化的。

public class UserCount{
            public String name;
            public int count;

            public UserCount() {}
            public UserCount(String name,int count) {
                this.name = name;
                this.count = count;
            }
 }
DataStreamSource<UserCount> userCount = env.fromElements(new UserCount("zhangsan",1),new UserCount("lisi",4));
userCount.keyBy("name");

原始数据类型(Primitive Types)

Flink支持Java或Scala中的所有原始数据类型,比如String、int、Double等。

常规类型(General Class Types)

除了POJO类型外,Flink能够支持Java和Scala大部分类。但是对于一些包含不能被序列化的字段类、I/O流类、或其它本地资源类是不支持的。
Flink会对常规类型以黑盒方式进行操作(无法访问其内容),使用Kryo进行常规类的序列化与反序列化。

上面POJO使用Avro序列化框架,这里的常规类型使用Kryo序列化框架,原因需要考证一下。

Values类型(自定义序列化类型)

值类型是使用手动序列化与反序列化来代替通用序列化框架。通过实现org.apache.flinktypes.Value接口来自己实现序列化与反序列。当使用通用序列化框架效率比较低的时候,使用Value类型是非常合理的。比如对于一个数组,我们知道它大部分都是0,那么我们可以对非零元素进行特殊编码即可,而不是使用通用框架对所有元素进行编码。
Flink对于基本类型提供预定义的Value类型:ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue。

Hadoop Writable

我们还可以使用实现了org.apache.hadoop.Writable接口的数据类型。其中使用write()方法和readFields()方法进行序列化和反序列化。

除了上面这六种类型数据,我们还可以使用一些特殊类型,比如Scala的Either、Option、Try等,Java中自定义的Either等。

关注我

欢迎关注我的公众号,会定期推送优质技术文章,让我们一起进步、一起成长!
公众号搜索:data_tc
或直接扫码:🔽


欢迎关注我
上一篇 下一篇

猜你喜欢

热点阅读