Flink DataStream API 介绍与使用
引文
正文
DataStream编程模型
在Flink
整个系统架构中,对流计算的支持是其最重要的功能之一,Flink
基于Google
提出的DataFlow
模型,实现了支持原生数据流处理的计算引擎。
Flink
中定义了DataStream API
让用户灵活且高效地编写Flink
流式应用。
DataStream API
主要可分为三个部分,DataSource
模块、Transformation
模块以及DataSink
模块。
-
其中
Sources
模块主要定义了数据接入功能,主要是将各种外部数据接入至Flink
系统中,并将接入数据转换成对应的DataStream
数据集。 -
在
Transformation
模块定义了对DataStream
数据集的各种转换操作,例如进行map
、filter
、windows
等操作。 -
最后,将结果数据通过
DataSink
模块写出到外部存储介质中,例如将数据输出到文件或Kafka
消息中间件等。
1. DataSources数据输入
DataSources
模块定义了DataStream API
中的数据输入操作,Flink
将数据源主要分为内置数据源和第三方数据源这两种类型。
其中内置数据源包含文件、Socket网络端口以及集合类型数据,其不需要引入其他依赖库,且在Flink
系统内部已经实现,用户可以直接调用相关方法使用。
第三方数据源定义了Flink
和外部系统数据交互的逻辑,包括数据的读写接口。在Flink
中定义了非常丰富的第三方数据源连接器(Connector
),例如Apache kafka Connector
、Elatic Search Connector
等。同时用户也可以自定义实现Flink
中数据接入函数SourceFunction
,并封装成第三方数据源的Connector
,完成Flink
与其他外部系统的数据交互。
这里我们着重讲下外部数据源:
a. 数据源连接器
前面提到的内置数据源类型都是一些基本的数据接入方式,例如从文件、Socket
端口中接入数据,其实质是实现了不同的SourceFunction
,Flink
将其封装成高级API
,减少了用户的使用成本。
对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此Flink
通过实现SourceFunction
定义了非常丰富的第三方数据连接器,基本覆盖了大部分的高性能存储介质以及中间件等。
- 其中部分连接器是仅支持读取数据,例如
Twitter Streaming API
、Netty
等; - 另外一部分仅支持数据输出(
Sink
),不支持数据输入(Source
),例如Apache Cassandra
、Elasticsearch
、Hadoop FileSystem
等。 - 还有一部分是既支持数据输入,也支持数据输出,例如
Apache Kafka
、Amazon Kinesis
、RabbitMQ
等连接器。
以Kafka
为例,用户在Maven
编译环境中导入下面代码清单所示的环境配置,主要因为Flink
为了尽可能降低用户在使用Flink
进行应用开发时的依赖复杂度,所有第三方连接器依赖配置放置在Flink
基本依赖库以外,用户在使用过程中,根据需要将需要用到的Connector
依赖库引入到应用工程中即可。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<flink.version>1.7.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<apache.hadoop.version>2.8.5</apache.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Schema
参数的主要作用是根据事先定义好的Schema
信息将数据序列化成该Schema
定义的数据类型,默认是SimpleStringSchema
,代表从Kafka
中接入的数据将转换成String
字符串类型处理。
用户通过自定义Schema
将接入数据转换成指定数据结构,主要是实现Deserialization-Schema
接口来完成.
b.自定义数据源连接器
Flink
中已经实现了大多数主流的数据源连接器,但需要注意,Flink
的整体架构非常开放,用户也可以自己定义连接器,以满足不同的数据源的接入需求。可以通过实现SourceFunction
定义单个线程的接入的数据接入器,也可以通过实现ParallelSource-Function
接口或继承RichParallelSourceFunction
类定义并发数据源接入器。
DataSoures
定义完成后,可以通过使用SteamExecutionEnvironment
的addSources
方法添加数据源,这样就可以将外部系统中的数据转换成DataStream[T]
数据集合,其中T
类型是Source-Function
返回值类型,然后就可以完成各种流式数据的转换操作。
2. DataSteam转换操作
即通过从一个或多个DataStream
生成新的DataStream
的过程被称为Transformation
操作。
在转换过程中,每种操作类型被定义为不同的Operator
,Flink
程序能够将多个Transformation
组成一个DataFlow
的拓扑。
所有DataStream
的转换操作可分为Single-DataStream
、Multi-DataStream
、物理分区三类类型。其中Single-DataStream
操作定义了对单个DataStream
数据集元素的处理逻辑,Multi-DataStream
操作定义了对多个DataStream
数据集元素的处理逻辑。物理分区定义了对数据集中的并行度和数据分区调整转换的处理逻辑。
2.1 Single-DataStream操作
(1)Map [DataStream->DataStream]
调用用户定义的MapFunction
对DataStream[T]
数据进行处理,形成新的Data-Stream[T]
,其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。例如将输入数据集中的每个数值全部加1
处理,并且将数据输出到下游数据集。
上图中计算逻辑实现代码如下,通过从集合中创建DataStream
,并调用DataStream
的map
方法传入计算表达式,完成对每个字段加1
操作,最后得到新的数据集mapStream
。
DataStream<Map<String, Object>> esDataStr = kafkaStream.map(
new MapFunction<String, Map<String, Object>>() {
private static final long serialVersionUID = 4987316772103776340L;
@Override
public Map<String, Object> map(String jsonStr) throws Exception {
try {
return CompEngineProcess.processMessage(jsonStr);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new HashMap<>();
}
}
}
);
(2)FlatMap [DataStream->DataStream]
该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在经典例子WordCount
中,将每一行的文本数据切割,生成单词序列。如在下图中对于输入DataStream[String]
通过FlatMap
函数进行处理,字符串数字按逗号切割,然后形成新的整数数据集。
针对上述计算逻辑实现代码如下所示,通过调用resultStream
接口中flatMap
方法将定义好的FlatMapFunction
传入,生成新的数据集。
DataStream<Tuple2<RoutingInfo, Map<String, Object>>> routeStream = filterStream.flatMap(new FlatMapFunction<Map<String, Object>, Tuple2<RoutingInfo, Map<String, Object>>>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Map<String, Object> inDataMap,
Collector<Tuple2<RoutingInfo, Map<String, Object>>> outCollector) {
List<RoutingInfo> routingList = RulesManager.getInstance().getRoutingList(inDataMap.get(CodeString.TRANS_HEADER_FIELD_TRCD).toString());
if (routingList != null) {
routingList.forEach(data -> outCollector.collect(new Tuple2<>(data, inDataMap)));
}
}
});
(3)Filter [DataStream->DataStream]
该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过滤掉。如下图所示将输入数据集中偶数过滤出来,奇数从数据集中去除。
针对上图中的计算逻辑代码实现如下
DataStream<Tuple2<RoutingInfo, Map<String, Object>>> filterStream = routeStream.filter(k -> isNull(k.f0));