What is Apache Flink - Applicati

2018-12-01  本文已影响7人  耳边的火

原文翻译如下。


Apache Flink是一个能在有界与无界数据流上进行有状态计算的框架。Flink在不同层次的抽象上提供了相应的API,并且针对常用的使用场景提供了工具库。

流应用的基础元素

基于流处理框架来构建与运行什么样的应用,取决于该应用所选择的框架处理 streamstate以及time的能力。下面我们将会描述流处理应用中这三个基础元素,并解释Flink是如何处理这三者的。

Streams

很显然,stream是流处理中的基础概念。然而,流的不同的特性决定了一个流能够并且应该如何被处理。Flink是一个多变的处理框架,能够处理任意类型的流。

State

每一个稍微复杂些的流应用都是有状态的,只有那些仅仅对单个事件进行转化的应用不需要状态。任何一个执行基本业务逻辑的应用都需要保存状态或中间计算结果,以便于在稍后的某个时刻,如下一个事件到达时或经过一段特定时间段后,再获取该状态或结果。


image

应用状态在Flink中是最重要的数据。Flink提供了处理state的上下文context,你可以在context提供的特性中印证这一点。

Time

Time是流应用中另一个重要概念。大多数的事件流都有其固有的时间属性,这是由于每一个事件都是在某个特定时刻产生的。不仅如此,许多常见的流计算都是基于时间的,例如windows aggregations(对在某时间窗口的所有数据执行聚合操作), sessionization(基于会话session的统计), pattern detection(模式检测), and time-based joins(基于时间的流联接)。一个流应用的重要方面便是该应用如何处理时间,处理何种类型的时间,如event-time(事件时间)或者processing-time(处理时间)。
Flink提供了一系列丰富的与时间相关的特性:

分层的API

Flink提供了三层API,每一层API在使用的简易性与表达的丰富性之间做了权衡,以适应不同的使用情况。
[图片上传失败...(image-b7f7e3-1543665726598)]
我们简要的展示一下每一层API,并且讨论它们的使用,以及给出一个代码示例。

ProcessFunction

ProcessFunction是Flink提供的最具有表达力的函数接口。Flink提供该接口来处理一个或两个输入流中的事件或者处理在window中分组后的事件。ProcessFunction提供了对时间和状态的精细控制。一个ProcessFunction可以根据需要修改状态以及注册一个timer,该timer会在设定好的未来某个时刻触发一个回调函数。因此,ProcessFunction可以按照大多数有状态的事件驱动(event-driven)应用的要求,实现针对每个事件的复杂业务逻辑。
下面的示例展示了 KeyedProcessFunction ,该函数作用在 KeyedStream 上,用来匹配 START 与 END 事件。当一个START事件到达时,该函数会将START事件的时间戳存储在state中,并且注册一个4小时的timer。如果在4小时内,注册的timer未被触发时,END事件就已经到达,该函数会计算END事件与START事件之间的时间差,清除state并且返回结果。否则,则会触发timer的回调函数,清除state内容。

/**
 * Matches keyed START and END events and computes the difference between 
 * both elements' timestamps. The first String field is the key attribute, 
 * the second String attribute marks START and END events.
 */
public static class StartEndDuration
    extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {

  private ValueState<Long> startTime;

  @Override
  public void open(Configuration conf) {
    // obtain state handle
    startTime = getRuntimeContext()
      .getState(new ValueStateDescriptor<Long>("startTime", Long.class));
  }

  /** Called for each processed event. */
  @Override
  public void processElement(
      Tuple2<String, String> in,
      Context ctx,
      Collector<Tuple2<String, Long>> out) throws Exception {

    switch (in.f1) {
      case "START":
        // set the start time if we receive a start event.
        startTime.update(ctx.timestamp());
        // register a timer in four hours from the start event.
        ctx.timerService()
          .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
        break;
      case "END":
        // emit the duration between start and end event
        Long sTime = startTime.value();
        if (sTime != null) {
          out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
          // clear the state
          startTime.clear();
        }
      default:
        // do nothing
    }
  }

  /** Called when a timer fires. */
  @Override
  public void onTimer(
      long timestamp,
      OnTimerContext ctx,
      Collector<Tuple2<String, Long>> out) {

    // Timeout interval exceeded. Cleaning up the state.
    startTime.clear();
  }
}

该示例展示了 KeyedProcessFunction 的表现力,但同时也体现出该API有些繁琐。

DataStream API

DataStream API针对大多数常见的流处理操作提供了方法,如:根据时间窗口处理数据(windowing),依次记录转化(record-at-a-time transformations),通过查询其他数据源来丰富事件(enriching events by querying an external data store)。DataStream API对Java与Scala都可用,且提供了如 map(), reduce(), 以及 aggregate()等方法。具体的业务逻辑可以通过实现其方法接口或者使用Java/Scala的lambda表达式。
下面的示例展示了,在鼠标点击事件流中,如何统计每个会话session中的点击数量。

// a stream of website clicks
DataStream<Click> clicks = ...

DataStream<Tuple2<String, Long>> result = clicks
  // project clicks to userId and add a 1 for counting
  .map(
    // define function by implementing the MapFunction interface.
    new MapFunction<Click, Tuple2<String, Long>>() {
      @Override
      public Tuple2<String, Long> map(Click click) {
        return Tuple2.of(click.userId, 1L);
      }
    })
  // key by userId (field 0)
  .keyBy(0)
  // define session window with 30 minute gap
  .window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
  // count clicks per session. Define function as lambda function.
  .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

SQL & Table API

Flink提供了两种关系型API:Table API 以及 SQL。不论是处理离线数据还是流数据,每种API都可以使用相同的方法来处理,例如在无界实时数据流或者记录数据流上使用相同的API执行查询操作,都会得到相同的结果。Table API 与 SQL 使用了Apache Calcite进行解析,校验以及查询优化。它们可以与DataStream以及DataSet API无缝的结合使用,且提供了对用户定义的标量(user-defined scalar)、聚合和表值函数(table-valued function)的支持。
Flink提供的关系型API的设计初衷便是使得数据分析,数据清洗以及构建ETL应用更简单。
下面的示例展示了如何使用SQL 的 API 在鼠标点击事件流中,查询每个会话session中的点击总数。这与上面DataStream API的使用案例是相同的。

SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId

工具库

Flink针对常见的数据处理使用场景提供了几个工具库。这些工具库一般都内置在API中而不会单独存在。因此,它们能够享受到Flink API提供的便利以及可以与其他工具库集成。

上一篇 下一篇

猜你喜欢

热点阅读