stream system-数据处理相关

2019-03-30  本文已影响0人  MontyOak

前文提到,相比于成熟的批处理系统,流失处理系统需要做到准确性和事件相关的时间推断。本章使用Apache Beam为例来说明数据处理相关的问题。
关于事件时间(event time)和处理时间(process time):如果关注的是数据准确和事件发生的上下文,则需要关注的是事件时间而不是处理时间。
关于窗口(windowing):常见的包括固定窗口,滑动窗口以及会话窗口。
接下来介绍另外三个概念:
触发器(trigger):触发器是一种机制,用于声明相对于某些外部信号,窗口的输出何时应该物化。可以理解成通过控制结果物化时机进行流量控制,以及在结果生成时做快照。为乱序数据和数据传输延迟的顺序调整提供了保证机制。
水线(watermarks):水线是关于事件时间的输入完整性的概念。归于时间X的水线标志着:所有事件时间小于X的事件都已经输入。水线标识的是数据的实际进展。
累积(accumulation):累加模式指定为同一窗口观察到的多个结果之间的关系。

批处理系统基石:what和where

what:转换

转换可以回答关于批处理系统的问题:“计算结果是什么?”。
举例来说,下面是一个球队的球员得分表,可以建立一个管道,通过累加球员得分来计算球队得分:

| Name | Team | Score | EventTime | ProcTime |
| Julie | TeamX | 5 | 12:00:26 | 12:05:19 |
| Frank | TeamX | 9 | 12:01:26 | 12:08:19 |
| Ed | TeamX | 7 | 12:02:26 | 12:05:39 |
| Julie | TeamX | 8 | 12:03:06 | 12:07:06 |
| Amy | TeamX | 3 | 12:03:39 | 12:06:13 |
| Fred | TeamX | 4 | 12:04:19 | 12:06:39 |
| Naomi | TeamX | 3 | 12:06:39 | 12:07:19 |
| Becky | TeamX | 8 | 12:07:26 | 12:08:39 |
| Naomi | TeamX | 1 | 12:07:46 | 12:09:00 |

在数据中,只有三列是我们关心的:

由数据的时间,可以画出下图:


数据的时间分布图

为了后文描述方便,这里先简单介绍几个Apache Beam的概念:

在上面例子中,input是由TeamsScore组成的KV结构的PCollection<KV<Team, Integer>>。这里的key表示队名,value是队内成员的得分。
下面是计算球队得分的伪代码:

PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
input.apply(Sum.integersPerKey());

处理过程图示(批处理)

where:窗口

下面给出固定窗口(窗口大小为2分钟)的伪代码示例:

PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey());

按窗口处理过程图示(流式处理)

流式系统:when和how

When:触发器
触发器回答了这个问题:“在处理过程中,什么时候物化结果?”一般来说,触发器分为下面两类:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
  .triggering(Repeatedly(AfterCount(1))));
  .apply(Sum.integersPerKey());

示例代码运行图示
通常有两种处理时间延迟的方法:对齐延迟(延迟将处理时间分割成固定的区域,这些区域跨键和窗口对齐)和非对齐延迟(延迟与在给定窗口中观察到的数据有关)。
对齐延迟示例代码:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
  .triggering(Repeatedly(AlignedDelay(TWO_MINUTES)))
  .apply(Sum.integersPerKey());

示例代码运行图示
非对齐延迟示例代码:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
  .triggering(Repeatedly(UnalignedDelay(TWO_MINUTES))
  .apply(Sum.integersPerKey());

示例代码运行图示
对于大型输入而言,非对齐延迟是更好的策略,它把数据更新的开销更平均的分摊开。

When:水线

由于处理时间相对于事件时间延迟的不确定性,需要有额外的工具来衡量数据的完整度,这里给出水线(watermark)来衡量数据的完整度。
从事件时间的角度来看,水线描述了输入数据的完整度。水线分为以下两种:

因为水线描述的是输入数据的完整度,所以它是完整度触发器的基础。
基于水线的完整度触发器示例代码:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
  .triggering(AfterWatermark()))
  .apply(Sum.integersPerKey());

两种水线对比图示
上述图示可以看出水线的缺点:

在数据完整性的描述上,低延迟和准确性往往需要做权衡。

when:触发器时机

beam模型将重复更新触发器和完整度触发器结合在一起,这被称为早期/准时/延迟触发器,因为它将由复合触发器物化的窗格划分为三类:

这是管道工作可以更新为下面的流程:我们将更新我们的管道,使用一个周期更新处理时间触发器,对早期触发使用一分钟的对齐延迟,对后期触发使用每个记录触发(更新策略取决于开销和延迟的平衡)。这样,早期的处理会给我们一些批处理大容量窗口(由于触发的每分钟只处理一次,不管吞吐量到窗口),但我们不会处理延迟到达的数据引入不必要的延迟。下面是示例代码:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
    .triggering(AfterWatermark()
      .withEarlyFirings(AlignedDelay(ONE_MINUTE))
      .withLateFirings(AfterCount(1))))
  .apply(Sum.integersPerKey());

两种水线对比图示
相比之前的方案,它有两个明显的优化点:

When:允许延迟

这部分讨论的是流式系统中的垃圾回收。上面例子中,所有数据都没有被清除,这对于处理迟到的数据而言非常有用。而在现实例子中,由于流式系统处理的常常是无限数据,所以不可能无限期保留所有数据。
这就需要建立一个阈值,设定最大接受的延迟,任何大于这个延迟到来的数据都将被丢弃。
引入阈值的示例代码:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
  .triggering(
    AfterWatermark()
      .withEarlyFirings(AlignedDelay(ONE_MINUTE))
      .withLateFirings(AfterCount(1)))
    .withAllowedLateness(ONE_MINUTE))
  .apply(Sum.integersPerKey());

带阈值的运行图示

对于设置延迟阈值的简单总结:

How:累加

来到最后一个问题:细化的结果是如何关联的?有三种累加模式:

PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AtCount(1)))
.discardingFiredPanes())
.apply(Sum.integersPerKey());

丢弃模式运行图示

PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());

累加再回缩模式运行图示

上一篇下一篇

猜你喜欢

热点阅读