Flink-Streaming-overview

2019-04-29  本文已影响0人  耳边的火

Flink中的流应用就是在数据流上应用各种转化(如:filter,update state,difine window,aggregation)。数据流有各种数据源创建而来(如:消息队列,socket流,文件等)。结果输出到sink,如写入文件或者标准输出。Flink程序可以在多种上下文中运行,standalone,内置在其他应用中等。应用可以在本地JVM中执行,也可以在集群的许多机器中执行。

示例程序


下面的程序是一个完成的应用,它演示了如何在web soscke上使用window统计5秒内的字数。你可以复制代码然后在你本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

运行应用前,先使用 netcat 在命令行中开启输入流:

nc -lk 9999

输入一些单词就会返回新的结果。这些单词或作为字数统计应用的输入。如果你想看到统计值大于1,可以在5秒内一遍又一遍的输入相同的单词(如果你做不到,可以增加window的大小)

数据源 Data Source


Source指的是你的程序从哪里读取它的输入。你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)在你的程序中添加数据源。Flink自带了一些实现好的数据源函数,淡然你可以实现 SourceFunction 来实现自定义的非并行的source或者实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来实现并行的source。
StreamExecutionEnvironment有一些实现定义好的数据源方法:

基于文件的数据源:

基于socket:

基于集合:

自定义:

DataStream Transformations


查阅 operator 文档

Data Sink


Data Sink读取数据流,并将它们写入到file,socket,其他系统或者打印它们。Flink自带了一些output format,它们被封装到一些操作符中:

注意的是 write*() 方法主要用于调试的目的。它们没有参与flink的checkpoint过程,这就意味着使用这些函数是“at-least-once”至少一次语义。数据如何写入目标系统是由OutputFormat决定的,也就是说发送到OutputFormat的数据并不一定会立即写入目标系统(如批量写入情况)。因此,在遇到故障时,这些数据有可能会丢失。
为了稳定地,精确一致的将流数据写入问加你系统,建议使用 flink-connector-filesystem。当然,如果自定义了sink function,通过 addSink 添加该自定义的sink,也可以参与flink的checkpoint过程,保持 exactly-once 语义。

Iterator


迭代流程序实现了step function,并且内置在 IterativeStream中。由于DataStream程序可能不会停止,因此iteration中不会有最大数量限制。你需要定义流中的哪些数据需要继续迭代,哪些数据可以发送到下游的操作符,你可以使用split或者filter实现。下面我们使用 filter 来演示。首先,我们定义一个 IterativeStream :

IterativeStream<Integer> iteration = input.iterate();

然后,我们定义在循环中,需要对数据流做哪些操作(下面我们就简单的使用map作为演示)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

为了定义迭代器何时关闭,可以调用 IterativeStream 的 closeWith(feedbackStream) 方法。传入 closeWith() 的数据流会再次进入迭代器,放到迭代器的head。一个常用的模式是,使用filter将流的一部分重新放入迭代器,而另一部分下发到下游操作符这些filter可以定义“终止”的逻辑,也就是一个数据可以不再进入迭代器,而是被转发到下游操作符。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,下面的程序就是对数据进行减1操作,直到为0:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

Execution Parameter 执行参数


StreamExecutionEnvironment 包括 ExecutionConfig ,它允许设置运行时所需的job配置。
请参阅 execution configuration 获取更多参数的解释。下面的参数仅属于 DataStream API:

故障容忍

查阅 State & Checkpointing

控制延迟


默认情况下,数据在网络间传输时,并不是一个一个的传输(造成不必要的网络拥堵),而是缓存后一起传输。buffer的大小可以在Flink 的配置文件中配置。尽管这种方式可以优化吞吐率,但是当输入流的速度不够快时,会造成延迟问题。为了平衡吞吐率和延迟,你可以使用 env.setBufferTimeout(timeoutMillis) 来设置最大等待时间。超过这个时间后,即便buffer没有填满,也要发出去。默认值为100ms。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大化吞吐率,设置 setBufferTimeout(-1) 会移除超时设置,仅当buffer填满后才发送。为了最小化延迟,设置超时的值接近0(如 5 或 10 毫秒)。应该避免设置值为0,因为这会导致服务性能下降。

调试 Debugging

在提交任务到分布式集群运行前,最好确认程序可以按预期运行。因此,实现一个数据分析应用,通常是一个增量的过程:检查结果,调试,优化。
Flink提供了本地IDE调试的功能,简化了数据分析应用的开发。包括加载测试数据,收集结果数据。这一部分会显示如何简化flink程序的开发,便于测试调试程序。

本地运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
加载测试数据
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意:需要提供数据烈性,iterator要实现 Serializable。不能并发执行

迭代Sink
import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
上一篇 下一篇

猜你喜欢

热点阅读