Flink专题

Flink DataStream API编程指南

2019-01-31  本文已影响116人  尼小摩

Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating)。数据流最初是从各种来源创建的 (例如 message queues, socket streams, files)。 结果通过sink返回, 通过sink可以将数据写入文件或者是标准输出(例如:命令行终端), Flink程序可以运行在各种上下文环境中,standalone 或者嵌入到其他程序中。可以在本地的JVM中运行,也可以在集群中运行。

有关Flink API的基本概念,请参阅 basic concepts

为了创建你自己的Flink DataStream程序,建议从剖析Flink程序开始,逐步添加您自己的stream transformations。其余部分请参考其他操作和高级特性章节。

实例程序(Example Program)

下面是一个完整的流式窗口word count应用程序,每个5秒会统计来自web socket中的数据。

package com.demon.app

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Auther: fc.w
  * @Date: 2019/1/31
  */
object ExampleProgramApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }

}

要运行示例程序,首先需要从终端启动输入流:

nc -lk 9999

仅需要输入一些单词然后点击回车键,就会输出到应用程序中。如果想看大于1的单词统计,在5秒内反复输入相同的单词(如果你输入不了那么快,可以增加时间窗口的长度)

数据源(Data Sources)

Sources是程序读取数据的入口,可以通过StreamExecutionEnvironment.addSource(sourceFunction) 添加一个source源到程序中。Flink中包含了已实现的源函数,但也可以通过实现SourceFunction 接口自定义非并行源函数,或者通过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来扩展自定义并行源函数。

Flink中有几个预定义的流源,可以StreamExecutionEnvironment访问:

基于文件:

实现:

在底层,Flink将文件读取过程分为两个子任务,即目录监控和数据读取。这些子任务都是单独实现的。监控是由一个单一的实现(并行度= 1)而非并行任务,数据读取是由多个并行运行的任务执行的。后者的并行度等于作业的并行度。 单个监控任务的作用是根据watchType定期或仅扫描一次来监控目录,扫描要处理的文件,将他们分割,并将这些split后的数据分配给下游读取器。每个split仅有一个读取器读取,而 a reader可以逐个地阅读多个拆分。

注意事项:

  1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,当文件被修改时,它的内容将被完全重新处理。这将破坏“exactly-once”语义,因为在文件末尾追加数据将导致对其所有内容进行重新处理。

  2. 如果watchType设置为FileProcessingMode.PROCESS_ONCE,Source 文件扫描路径一次并退出,无需等待读取器完成对文件内容的读取。当然读取器将继续读取,直到读取所有文件内容。关闭Source将导致此后不再有checkpoints。这可能会导致节点失败后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

基于scoket:

基于Collection:

DataStream Transformations

关于可用的流转换,请参考operators

Data Sinks

Data sinks消费DataStreams然后转发给files, sockets, external systems和print。Flink提供了各种内置的输出格式,这些格式封装在DataStreams操作之后:

注意,DataStream上的write*()方法主要用于调试。他们没有参与Flink的检查点,这意味着这些函数通常具有at-least-once的语义。目标系统的数据刷新依赖于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了可靠、精确地将流一次传递到文件系统中,使用flink-connector-filesystem。另外,通过.addsink(…)方法的自定义实现可以参与Flink的检查点,实现精确的一次语义。

Iterations

迭代流程序实现一个步骤函数并将其嵌入IterativeStream中。因为一个DataStream程序可能永远不会结束,因此没有最大迭代次数。相反,您需要指定流的哪些部分反馈给迭代,哪部分使用split 转换或filter被转发到下游。在这里,我们有一个Iteration示例,其中主要部分(重复计算的部分)是一个简单的map()转换,通过使用filter将元素转发到下游。

val iteratedStream = someDataStream.iterate(
  iteration => {
    val iterationBody = iteration.map(/* this is executed many times */)
    (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})

例如下面的程序,它不断地从一系列整数中减去1,直到0:

package com.demon.app

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Auther: fc.w
  * @Date: 2019/1/31
  */
object ExampleProgramApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val someIntegers = env.generateSequence(0, 1000)
    val iteratedStream = someIntegers.iterate(iteration => {
      val minusOne = iteration.map(v => v - 1)
      val stillGreaterThanZero = minusOne.filter(_ > 0)
      val lessThanZero = minusOne.filter(_ <= 0)
      (stillGreaterThanZero, lessThanZero)
    })
    iteratedStream.print()

    env.execute("Window Stream WordCount")
  }

}

执行参数

StreamExecutionEnvironment包含ExecutionConfig,为Job设置指定配置值。
有关更多的参数说明,请参考 execution configuration这些参数主要适用于DataStream API:

容错

State & Checkpointing描述如何启用和配置Flink的检查点机制。

Controlling Latency(控制延迟)

默认情况下,元素不会在网络上逐个传输(这会导致不必要的网络流量),而是被缓冲。缓冲区的大小(实际上是在机器之间传输的)可以在Flink配置文件中设置。虽然这种方法可以很好地优化吞吐量,但当传入流的速度不够快时,它可能会导致延迟问题。为了控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)设置缓冲区被填满的最大等待时间,在此之后,即使缓冲区没有满,也会自动发送缓冲区。此超时的默认值是100 ms。
用法:

val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)

env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
package com.demon.app

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Auther: fc.w
  * @Date: 2019/1/31
  */
object ExampleProgramApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    /* 分别可以在执行环境和单个操作上使用 */ 
    // 执行环境
    env.setBufferTimeout(100)

    val someIntegers = env.generateSequence(0, 1000)
    val iteratedStream = someIntegers.iterate(iteration => {
      val minusOne = iteration.map(v => v - 1)
      val stillGreaterThanZero = minusOne.filter(_ > 0)
      val lessThanZero = minusOne.filter(_ <= 0)
      (stillGreaterThanZero, lessThanZero)
    })
    // 单个操作
    iteratedStream.setBufferTimeout(100)

    env.execute("Window Stream WordCount")
  }

}

为了最大限度地提高吞吐量,设置setBufferTimeout(-1),它将移除超时,缓冲区只有在满时才刷新。
为了最小化延迟,将超时设置为接近0的值(例如5或10 ms)。应该避免缓冲区超时为0,因为这会导致严重的性能下降。

故障排除(Debugging)

在分布式集群中运行流程序之前,最好确保所实现的算法能够正常工作。因此,数据分析程序通常是检查结果、调试和改进的增量过程。

Flink提供了简化数据分析程序开发过程的特性。支持IDE中的本地调试、测试数据的注入和结果数据的收集。
本节给出了一些简化Flink程序开发示例。

本地执行环境(Local Execution Environment)

LocalStreamEnvironment在它创建的JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,可以在代码中设置断点并轻松调试程序。
创建并使用LocalEnvironment如下所示:

val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */)
// build your program

env.execute()

收集数据来源(Collection Data Sources)

Flink提供了由Java集合支持的特殊数据源,以简化测试。一旦对程序进行了测试,sources和sinks就可以轻松地替换或读写外部系统的sources和sinks。

采集数据源的使用方法如下:

val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意:当前,集合数据源要求数据类型和迭代器实现Serializable。此外,收集数据源不能并行执行(并行度= 1)。

迭代器数据接收(Iterator Data Sink)

Flink还提供了一个接收器,用于DataStream结果的收集和测试。它可以使用如下:

import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

注意:Flink 1.5.0中删除了Flink-streaming-contrib模块。它的类已经被转移到flink-streaming-java和flink-streaming-scala中。

接下来:

操作符(Operators): 可用流操作符的规范。
事件时间(Event Time): Flink的时间概念介绍。
状态和容错(State & Fault Tolerance): 说明如何开发有状态应用程序。
连接器(Connectors): 可用输入和输出连接器的描述。

上一篇 下一篇

猜你喜欢

热点阅读