stream system-数据处理相关
前文提到,相比于成熟的批处理系统,流失处理系统需要做到准确性和事件相关的时间推断。本章使用Apache Beam为例来说明数据处理相关的问题。
关于事件时间(event time)和处理时间(process time):如果关注的是数据准确和事件发生的上下文,则需要关注的是事件时间而不是处理时间。
关于窗口(windowing):常见的包括固定窗口,滑动窗口以及会话窗口。
接下来介绍另外三个概念:
触发器(trigger):触发器是一种机制,用于声明相对于某些外部信号,窗口的输出何时应该物化。可以理解成通过控制结果物化时机进行流量控制,以及在结果生成时做快照。为乱序数据和数据传输延迟的顺序调整提供了保证机制。
水线(watermarks):水线是关于事件时间的输入完整性的概念。归于时间X的水线标志着:所有事件时间小于X的事件都已经输入。水线标识的是数据的实际进展。
累积(accumulation):累加模式指定为同一窗口观察到的多个结果之间的关系。
- 计算的结果是什么?这个问题由管道中的转换类型来回答。这包括计算和、构建直方图、训练机器学习模型等等。这本质上也是经典的批处理所回答的问题。
- 事件时间在哪里计算结果?这个问题可以通过在管道中使用事件时间窗口来回答。这包括从前文(固定,滑动和会话)窗口的常见例子;似乎没有窗口概念的用例(例如,与时间无关的处理;经典的批处理一般也属于这一类);以及其他更复杂的窗口类型,比如限时拍卖。还请注意,如果在记录到达系统时将入口时间指定为事件时间,那么它还可以包括处理时间窗口。
- 在处理过程中,什么时候实现结果?这个问题通过使用触发器和(可选的)水线来回答。在这个主题上有无限的变化,但是最常见的模式是那些涉及重复更新的模式。即那些利用水线为每个窗口提供一个单独的输出,只有在相应的输入被认为是完整的(即,或两者的某种组合。
- 结果的细化是如何关联的?这个问题的答案是使用的积累类型:丢弃(其中结果都是独立的和不同的)、积累(在此过程中,后面的结果建立在前面的结果的基础上)或积累和收回(在此过程中,积累值加上对先前触发值的收回)。
批处理系统基石:what和where
what:转换
转换可以回答关于批处理系统的问题:“计算结果是什么?”。
举例来说,下面是一个球队的球员得分表,可以建立一个管道,通过累加球员得分来计算球队得分:
| Name | Team | Score | EventTime | ProcTime |
| Julie | TeamX | 5 | 12:00:26 | 12:05:19 |
| Frank | TeamX | 9 | 12:01:26 | 12:08:19 |
| Ed | TeamX | 7 | 12:02:26 | 12:05:39 |
| Julie | TeamX | 8 | 12:03:06 | 12:07:06 |
| Amy | TeamX | 3 | 12:03:39 | 12:06:13 |
| Fred | TeamX | 4 | 12:04:19 | 12:06:39 |
| Naomi | TeamX | 3 | 12:06:39 | 12:07:19 |
| Becky | TeamX | 8 | 12:07:26 | 12:08:39 |
| Naomi | TeamX | 1 | 12:07:46 | 12:09:00 |
在数据中,只有三列是我们关心的:
- Score 个人得分数
- EventTime 得分发生的时间
- ProcTime 系统接收到得分事件的时间
由数据的时间,可以画出下图:
数据的时间分布图
为了后文描述方便,这里先简单介绍几个Apache Beam的概念:
- PCollections 这些表示可以跨其执行并行转换的数据集(可能是大型数据集)。
-
PTransforms 表示数据的处理过程,由一个PCollections映射到另一个PCollections。可能是按元素的计算转换,分类或者合成计算。
PTransforms 示例
在上面例子中,input是由Teams
和Score
组成的KV结构的PCollection<KV<Team, Integer>>
。这里的key表示队名,value是队内成员的得分。
下面是计算球队得分的伪代码:
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
input.apply(Sum.integersPerKey());
where:窗口
下面给出固定窗口(窗口大小为2分钟)的伪代码示例:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey());
流式系统:when和how
When:触发器
触发器回答了这个问题:“在处理过程中,什么时候物化结果?”一般来说,触发器分为下面两类:
- 重复更新触发器(repeated update triggers) 随着内容的变化,周期性的为窗口生成新的窗格(pane)。更新的频率可以是每条数据的写入或者每分钟/小时更新,具体策略的选择需要平衡数据的延迟和更新的代价。
- 完整性触发器(completeness triggers) 只有当窗口的输入被认为已完成到某个阈值时,才会物化窗口的窗格。这种触发器经常用于类似批处理的任务,当数据收集完成度到达要求之后,才生成相应结果。不同之处在于,这里的完整度是对于窗口数据而言的,不是对于整个数据集。
下面是一个重复更新触发器的示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly(AfterCount(1))));
.apply(Sum.integersPerKey());
示例代码运行图示
通常有两种处理时间延迟的方法:对齐延迟(延迟将处理时间分割成固定的区域,这些区域跨键和窗口对齐)和非对齐延迟(延迟与在给定窗口中观察到的数据有关)。
对齐延迟示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly(AlignedDelay(TWO_MINUTES)))
.apply(Sum.integersPerKey());
示例代码运行图示
非对齐延迟示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly(UnalignedDelay(TWO_MINUTES))
.apply(Sum.integersPerKey());
示例代码运行图示
对于大型输入而言,非对齐延迟是更好的策略,它把数据更新的开销更平均的分摊开。
When:水线
由于处理时间相对于事件时间延迟的不确定性,需要有额外的工具来衡量数据的完整度,这里给出水线(watermark)来衡量数据的完整度。
从事件时间的角度来看,水线描述了输入数据的完整度。水线分为以下两种:
- 完美水线 当我们对所有的输入数据都有完全的了解时,就有可能构造出一个完美的水线。在这种情况下,不存在迟来的数据;所有数据都是早或准时的。
- 启发式水线 对于许多分布式输入源来说,对输入数据的完整了解是不切实际的,在这种情况下,最佳选择是提供启发式水线。启发式水印使用任何关于输入的可用信息(分区,分区内排序,文件增长速率等等)来提供尽可能准确的进度估计。在许多情况下,这样的水印可以非常准确。即使如此,启发式水印的使用意味着它可能出错,这将导致数据延迟。后文将会描述如何处理这种情况。
因为水线描述的是输入数据的完整度,所以它是完整度触发器的基础。
基于水线的完整度触发器示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark()))
.apply(Sum.integersPerKey());
两种水线对比图示
上述图示可以看出水线的缺点:
- 首先,在上述图示中,由于种种原因而迟到的数据没有被水线观测到,这是启发式水线所无法避免的问题。在另外一些场景下,比如数据监测看板,数据需要以准实时的方式更新而不是按水线的方式展示出较大的延迟(类似传统批处理的情况)。
- 当启发式水印提前不正确时,具有水印前事件时间的数据可能会延迟一段时间到达,从而创建延迟的数据。这是在右边的示例中发生的情况:水印在观察到第一个窗口的所有输入数据之前,提前到第一个窗口的末尾,结果输出值不是14而是5。他们的启发式本质意味着他们有时会犯错。因此,如果您关心正确性,仅依靠它们来确定何时实现输出是不够的。
在许多情况下,这些水印在预测时可以非常准确。即便如此,启发式水印的使用意味着它有时可能是错误的,这将导致数据延迟。
在数据完整性的描述上,低延迟和准确性往往需要做权衡。
when:触发器时机
beam模型将重复更新触发器和完整度触发器结合在一起,这被称为早期/准时/延迟触发器,因为它将由复合触发器物化的窗格划分为三类:
- 零个或多个早期窗格,这些窗格是重复更新触发器的结果,该触发器定期触发,直到水印结束。这些触发所生成的窗格包含推测结果,但允许我们观察随着新输入数据的到来,窗口随时间的演变。这弥补了水印有时过于迟缓的缺点。
- 一个单独的准时窗格,这是在水印经过窗口末端后,基于水线的完整性触发器触发的结果。这个触发是特殊的,因为它提供了一个断言,现在系统相信这个窗口的输入是完整的。这意味着现在可以安全地对丢失的数据进行推断。
- 零或多个延迟窗格,这是另一个(可能是不同的)重复更新触发器的结果,该触发器在水印经过窗口末尾后,每当延迟数据到达时定期触发该触发器。在完美水印的情况下,总是会有零个延迟窗格。但是在启发式水印的情况下,任何水印不能正确解释的数据都会导致延迟触发。这弥补了水印速度过快的缺点。
这是管道工作可以更新为下面的流程:我们将更新我们的管道,使用一个周期更新处理时间触发器,对早期触发使用一分钟的对齐延迟,对后期触发使用每个记录触发(更新策略取决于开销和延迟的平衡)。这样,早期的处理会给我们一些批处理大容量窗口(由于触发的每分钟只处理一次,不管吞吐量到窗口),但我们不会处理延迟到达的数据引入不必要的延迟。下面是示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AfterCount(1))))
.apply(Sum.integersPerKey());
两种水线对比图示
相比之前的方案,它有两个明显的优化点:
- 早期窗格能够以准实时的延迟提供一个早期的近似结果
- 对于延迟到达的数据,由延迟窗格负责更新准时窗格的数据
When:允许延迟
这部分讨论的是流式系统中的垃圾回收。上面例子中,所有数据都没有被清除,这对于处理迟到的数据而言非常有用。而在现实例子中,由于流式系统处理的常常是无限数据,所以不可能无限期保留所有数据。
这就需要建立一个阈值,设定最大接受的延迟,任何大于这个延迟到来的数据都将被丢弃。
引入阈值的示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AfterCount(1)))
.withAllowedLateness(ONE_MINUTE))
.apply(Sum.integersPerKey());
对于设置延迟阈值的简单总结:
- 如果可以建立完美水线,没必要处理延迟数据,延迟阈值可以简单设置为零。
- 对于启发式水线而言,根据实际数据和容量大小的比较来设置延迟阈值。
How:累加
来到最后一个问题:细化的结果是如何关联的?有三种累加模式:
- 丢弃 每次物化窗格数据时,把存储的状态丢弃。这种情况下,每个窗格都是彼此独立的。当下游消费者可以自己处理累加的时候,这种模式可以被采用。
丢弃模式示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AtCount(1)))
.discardingFiredPanes())
.apply(Sum.integersPerKey());
- 累加 每次物化窗格时,都会保留之前存储状态,并将后来的输入累积到旧有状态。这种情况下,后面的窗格依赖之前的窗格。当后面的结果可以简单地覆盖之前的结果时,这种模式是有用的。
- 累加再回收 类似累加模式,不同的是在生成新窗格时,它还会为前一个窗格生成独立的回缩。这种回缩在两种情况下有用:
- 当下游的消费者按照不同的维度对数据进行重新分组时,完全有可能新值的键值与前一个值不同,因此最终属于不同的组。在这种情况下,新值不能覆盖旧值;相反,您需要回缩来删除旧值
- 当使用动态窗口时,由于窗口合并,新值可能会替换多个以前的窗口。在这种情况下,仅从新窗口就很难确定哪些旧窗口正在被替换。对旧窗口进行显式的回缩使任务变得简单。
累加再回缩模式示例代码:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());