Flink DataStream API编程指南
Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating)。数据流最初是从各种来源创建的 (例如 message queues, socket streams, files)。 结果通过sink返回, 通过sink可以将数据写入文件或者是标准输出(例如:命令行终端), Flink程序可以运行在各种上下文环境中,standalone 或者嵌入到其他程序中。可以在本地的JVM中运行,也可以在集群中运行。
有关Flink API的基本概念,请参阅 basic concepts。
为了创建你自己的Flink DataStream程序,建议从剖析Flink程序开始,逐步添加您自己的stream transformations。其余部分请参考其他操作和高级特性章节。
实例程序(Example Program)
下面是一个完整的流式窗口word count应用程序,每个5秒会统计来自web socket中的数据。
package com.demon.app
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Auther: fc.w
* @Date: 2019/1/31
*/
object ExampleProgramApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
要运行示例程序,首先需要从终端启动输入流:
nc -lk 9999
仅需要输入一些单词然后点击回车键,就会输出到应用程序中。如果想看大于1的单词统计,在5秒内反复输入相同的单词(如果你输入不了那么快,可以增加时间窗口的长度)
数据源(Data Sources)
Sources是程序读取数据的入口,可以通过StreamExecutionEnvironment.addSource(sourceFunction)
添加一个source源到程序中。Flink中包含了已实现的源函数,但也可以通过实现SourceFunction
接口自定义非并行源函数,或者通过实现ParallelSourceFunction
接口或继承RichParallelSourceFunction
来扩展自定义并行源函数。
Flink中有几个预定义的流源,可以StreamExecutionEnvironment访问:
基于文件:
- readTextFile(path): 逐行读取文本文件,遵守TextInputFormat规范的文件,并将其作为字符串返回。
- readFile(fileInputFormat, path): 按照指定文件的输入格式读取(一次)文件。
-
readFile(fileInputFormat, path, watchType, interval, pathFilter): 这个方法是前两个的内部调用,根据给定的fileInputFormat读取路径中的文件。根据所提供的watchType(每隔多少ms)定期监控新数据的路径(
fileprocessingmode.process_continue
), 或者一次性处理当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE
),使用pathFilter,用户可以进一步排除正在处理的文件。
实现:
在底层,Flink将文件读取过程分为两个子任务,即目录监控和数据读取。这些子任务都是单独实现的。监控是由一个单一的实现(并行度= 1)而非并行任务,数据读取是由多个并行运行的任务执行的。后者的并行度等于作业的并行度。 单个监控任务的作用是根据watchType定期或仅扫描一次来监控目录,扫描要处理的文件,将他们分割,并将这些split后的数据分配给下游读取器。每个split仅有一个读取器读取,而 a reader可以逐个地阅读多个拆分。
注意事项:
-
如果watchType设置为
FileProcessingMode.PROCESS_CONTINUOUSLY
,当文件被修改时,它的内容将被完全重新处理。这将破坏“exactly-once”语义,因为在文件末尾追加数据将导致对其所有内容进行重新处理。 -
如果watchType设置为
FileProcessingMode.PROCESS_ONCE
,Source 文件扫描路径一次并退出,无需等待读取器完成对文件内容的读取。当然读取器将继续读取,直到读取所有文件内容。关闭Source将导致此后不再有checkpoints。这可能会导致节点失败后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
基于scoket:
- socketTextStream -从socket读取。元素可以用分隔符分隔。
基于Collection:
- fromCollection(Seq):从Java.util.Collection创建数据流。集合中所有元素必须是相同的类型。
- fromCollection(Iterator): 从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(elements: _*): 从给定的对象序列中创建数据流。所有对象必须是相同的类型。
- fromParallelCollection(SplittableIterator) :并行地从迭代器创建数据流, 该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) :在给定的区间内并行生成数字序列。
-
addSource(): 附加一个新的源函数,例如,要从Apache Kafka读取数据,可以使用
addSource(new FlinkKafkaConsumer08<>(…))
。有关详细信息,请参见连接器。
DataStream Transformations
关于可用的流转换,请参考operators
Data Sinks
Data sinks消费DataStreams然后转发给files, sockets, external systems和print。Flink提供了各种内置的输出格式,这些格式封装在DataStreams操作之后:
- writeAsText() / TextOutputFormat: 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。
- writeAsCsv(...) / CsvOutputFormat :将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
- print() / printToErr() :在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用。如果并行度大于1,输出还将加上生成输出的任务的标识符。
- writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。
- writeToSocket: 根据SerializationSchema将元素写入Socket。
- addSink: 调用自定义sink函数,Flink附带了到其他系统(如Apache Kafka)的连接器,这些连接器实现了sink函数。
注意,DataStream上的write*()方法主要用于调试。他们没有参与Flink的检查点,这意味着这些函数通常具有at-least-once的语义。目标系统的数据刷新依赖于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。
为了可靠、精确地将流一次传递到文件系统中,使用flink-connector-filesystem。另外,通过.addsink(…)方法的自定义实现可以参与Flink的检查点,实现精确的一次语义。
Iterations
迭代流程序实现一个步骤函数并将其嵌入IterativeStream中。因为一个DataStream程序可能永远不会结束,因此没有最大迭代次数。相反,您需要指定流的哪些部分反馈给迭代,哪部分使用split 转换或filter被转发到下游。在这里,我们有一个Iteration示例,其中主要部分(重复计算的部分)是一个简单的map()转换,通过使用filter将元素转发到下游。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})
例如下面的程序,它不断地从一系列整数中减去1,直到0:
package com.demon.app
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Auther: fc.w
* @Date: 2019/1/31
*/
object ExampleProgramApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val someIntegers = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(iteration => {
val minusOne = iteration.map(v => v - 1)
val stillGreaterThanZero = minusOne.filter(_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
})
iteratedStream.print()
env.execute("Window Stream WordCount")
}
}
执行参数
StreamExecutionEnvironment包含ExecutionConfig,为Job设置指定配置值。
有关更多的参数说明,请参考 execution configuration这些参数主要适用于DataStream API:
-
setAutoWatermarkInterval(long milliseconds): 设置Watermark自动触发间隔,也可以通过
getAutoWatermarkInterval()
获取一个long值。
容错
State & Checkpointing描述如何启用和配置Flink的检查点机制。
Controlling Latency(控制延迟)
默认情况下,元素不会在网络上逐个传输(这会导致不必要的网络流量),而是被缓冲。缓冲区的大小(实际上是在机器之间传输的)可以在Flink配置文件中设置。虽然这种方法可以很好地优化吞吐量,但当传入流的速度不够快时,它可能会导致延迟问题。为了控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)
设置缓冲区被填满的最大等待时间,在此之后,即使缓冲区没有满,也会自动发送缓冲区。此超时的默认值是100 ms。
用法:
val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
package com.demon.app
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Auther: fc.w
* @Date: 2019/1/31
*/
object ExampleProgramApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
/* 分别可以在执行环境和单个操作上使用 */
// 执行环境
env.setBufferTimeout(100)
val someIntegers = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(iteration => {
val minusOne = iteration.map(v => v - 1)
val stillGreaterThanZero = minusOne.filter(_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
})
// 单个操作
iteratedStream.setBufferTimeout(100)
env.execute("Window Stream WordCount")
}
}
为了最大限度地提高吞吐量,设置setBufferTimeout(-1)
,它将移除超时,缓冲区只有在满时才刷新。
为了最小化延迟,将超时设置为接近0的值(例如5或10 ms)。应该避免缓冲区超时为0,因为这会导致严重的性能下降。
故障排除(Debugging)
在分布式集群中运行流程序之前,最好确保所实现的算法能够正常工作。因此,数据分析程序通常是检查结果、调试和改进的增量过程。
Flink提供了简化数据分析程序开发过程的特性。支持IDE中的本地调试、测试数据的注入和结果数据的收集。
本节给出了一些简化Flink程序开发示例。
本地执行环境(Local Execution Environment)
LocalStreamEnvironment在它创建的JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,可以在代码中设置断点并轻松调试程序。
创建并使用LocalEnvironment如下所示:
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
收集数据来源(Collection Data Sources)
Flink提供了由Java集合支持的特殊数据源,以简化测试。一旦对程序进行了测试,sources和sinks就可以轻松地替换或读写外部系统的sources和sinks。
采集数据源的使用方法如下:
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意:当前,集合数据源要求数据类型和迭代器实现Serializable。此外,收集数据源不能并行执行(并行度= 1)。
迭代器数据接收(Iterator Data Sink)
Flink还提供了一个接收器,用于DataStream结果的收集和测试。它可以使用如下:
import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala
注意:Flink 1.5.0中删除了Flink-streaming-contrib模块。它的类已经被转移到flink-streaming-java和flink-streaming-scala中。
接下来:
操作符(Operators): 可用流操作符的规范。
事件时间(Event Time): Flink的时间概念介绍。
状态和容错(State & Fault Tolerance): 说明如何开发有状态应用程序。
连接器(Connectors): 可用输入和输出连接器的描述。