数据处理的内容、地点、时间和方式
为了让您了解实际情况,我使用Apache Beam代码片段,并结合延时图来提供可视化的表示。Apache Beam是一个用于批处理和流处理的统一编程模型和可移植性层,它具有不同语言(如Java和Python)的具体sdk。然后,可以在任何受支持的执行引擎(Apache Apex、Apache Flink、Apache Spark、云数据流等)上移植地运行用Apache Beam编写的管道。
在这里使用Apache Beam作为例子是因为它最完整地体现了本书所描述的概念。回到“stream 102”最初编写的时候(回到它仍然是来自谷歌Cloud Dataflow的数据流模型,而不是来自Apache Beam的数据流模型),它实际上是现有的惟一系统,为我们将在这里讨论的所有示例提供了必要的表达能力。一年半之后,我很高兴地说,已经发生了很大的变化,而且大多数主要的系统已经或者正在朝着支持一个看起来很像本书描述的模型的方向发展。所以请放心,我们在这里所介绍的概念,虽然是通过光束透镜来了解的,但也同样适用于你将遇到的大多数其他系统。
Roadmap
为了帮助为本章做好准备,我想列出5个主要概念,它们将作为本章所有讨论的基础,实际上,在第I部分的其余部分中,我们已经讨论了其中的两个。
在第1章中,我们首先建立了事件时间(事件发生的时间)和处理时间(处理期间观察到的时间)之间的关键区别。这提供了基础的一个主要论文提出在这本书中:如果你关心正确性和事件实际发生的环境中,你必须分析数据相对于其固有的事件,不是遇到的处理时间在分析本身。
然后,我们引入了窗口的概念(即。这是一种常见的方法,用于处理这样一个事实,即从技术上讲,无限数据源可能永远不会结束。窗口策略的一些更简单的例子是固定窗口和滑动窗口,但更复杂的窗口类型,如会话(其中窗口由数据本身的特性定义;例如,为每个用户捕获一个活动会话,然后捕捉一个不活动间隙)也可以看到广泛的用法。
除了这两个概念,我们现在要仔细看看另外三个:
触发器(Triggers)
触发器是一种机制,用于声明相对于某些外部信号,窗口的输出何时应该物化。触发器提供了选择何时应该发出输出的灵活性。在某种意义上,您可以将它们视为一种流控制机制,用于指示何时应该实现结果。另一种看待它的方法是,触发器类似于相机上的快门释放,允许您在计算结果时声明何时拍照。触发器还可以在窗口的演化过程中多次观察其输出。这进而打开了一扇门,可以随着时间的推移对结果进行细化,从而允许在数据到达时提供推测性的结果,以及处理上游数据(修订)随时间的变化或延迟到达的数据。
水印(Watermarks)
水印是一个关于事件时间的输入完整性的概念。一个时间为X的水印声明:“所有事件时间小于X的输入数据都被观察到。”因此,当观察没有已知终点的无限数据源时,水印可以作为进度的度量。我们在本章触及了水印的基础,然后Slava在第三章深入探讨了这个主题。
堆积物(Accumulation)
累加模式指定为同一窗口观察到的多个结果之间的关系。这些结果可能完全脱节;也就是说,表示随时间变化的独立增量,或者它们之间可能有重叠。不同的积累模式具有不同的语义和与之相关的成本,从而发现跨各种用例的适用性。
此外,因为我认为这样更容易理解所有这些概念之间的关系,所以我们在回答四个问题的结构中重新审视旧的和探索新的,所有这些问题对每个无界数据处理问题都至关重要:
1.计算的结果是什么?这个问题由管道中的转换类型来回答。这包括计算和、构建直方图、训练机器学习模型等等。这本质上也是经典的批处理所回答的问题。
2.事件时间在哪里计算结果?这个问题可以通过在管道中使用事件时间窗口来回答。这包括从第一章(固定,滑动和会话)窗口的常见例子;似乎没有窗口概念的用例(例如,与时间无关的处理;经典的批处理一般也属于这一类);以及其他更复杂的窗口类型,比如限时拍卖。还请注意,如果在记录到达系统时将入口时间指定为事件时间,那么它还可以包括处理时间窗口。
3.在处理过程中,什么时候实现结果?这个问题通过使用触发器和(可选的)水印来回答。在这个主题上有无限的变化,但是最常见的模式是那些涉及重复更新的模式。(即那些利用水印为每个窗口提供一个单独的输出,只有在相应的输入被认为是完整的),或两者的某种组合。
4.结果的细化是如何关联的?这个问题的答案是使用的积累类型:丢弃(其中的结果都是独立和不同的)、积累(其中稍后的结果建立在先前的结果之上)或积累和收回(在此过程中,积累值加上对先前触发值的收回)
批次基础:What和Where
What:转换
在经典批处理中应用的转换回答了这样一个问题:“计算了什么结果?” 尽管您可能已经熟悉经典的批处理,但我们还是要从这里开始,因为它是基础,我们在此基础上添加了所有其他概念。
在本章的其余部分(实际上,通过本书的大部分内容),我们将看到一个示例:对由9个值组成的简单数据集计算键整和。假设我们已经编写了一个基于团队的手机游戏,我们想要构建一个管道,通过汇总用户手机报告的个人得分来计算团队得分。如果我们要在一个名为“UserScores”的SQL表中捕获我们的9个示例分数,它可能看起来像这样:
| Name | Team | Score | EventTime | ProcTime |
| Julie | TeamX | 5 | 12:00:26 | 12:05:19 |
| Frank | TeamX | 9 | 12:01:26 | 12:08:19 |
| Ed | TeamX | 7 | 12:02:26 | 12:05:39 |
| Julie | TeamX | 8 | 12:03:06 | 12:07:06 |
| Amy | TeamX | 3 | 12:03:39 | 12:06:13 |
| Fred | TeamX | 4 | 12:04:19 | 12:06:39 |
| Naomi | TeamX | 3 | 12:06:39 | 12:07:19 |
| Becky | TeamX | 8 | 12:07:26 | 12:08:39 |
| Naomi | TeamX | 1 | 12:07:46 | 12:09:00 |
注意,本例中的所有分数都来自同一团队的用户;这是为了保持示例的简单性,因为我们在接下来的图表中只有有限数量的维度。因为我们是按团队分组的,所以我们只关心最后三列:
Score:与此事件关联的单个用户得分
EventTime:比赛时间为记分时间;也就是说,得分发生的时间
ProcTime:分数的处理;也就是说,管道观察得分的时间
对于每个示例管道,我们将查看一个延时图,该图突出显示数据如何随时间发展。这些图表绘制了我们所关心的时间的两个维度上的9个分数:x轴上的事件时间和y轴上的处理时间。图2-1展示了输入数据的静态图。
image.png
图2-1 9条输入记录,分别在事件时间和处理时间中绘制
随后的延时图要么是动画(Safari),要么是一系列帧(打印和所有其他数字格式),允许您查看数据是如何随时间处理的(在我们看到第一个延时图后不久,将对此进行更多的介绍)。
前面的每个示例都是Apache Beam Java SDK伪代码的一小段,以使管道的定义更加具体。从某种意义上说,它是伪代码,因为我有时会修改规则、省略细节(比如使用具体的I/O源)或简化名称(Beam Java 2.x和更早版本中的触发器名称冗长得令人痛苦;为了清晰起见,我使用更简单的名称)以使示例更清晰。除了这些小事情之外,它还是真实的Beam代码。
如果您已经熟悉Spark或Flink之类的东西,那么您应该能够相对轻松地理解Beam代码在做什么。但是为了给你一个速成课程,在Beam中有两个基本的基本类型:
PCollections
这些表示可以跨其执行并行转换的数据集(可能是大型数据集)(因此名称开头的“P”)。
PTransform
将这些应用于PCollections以创建新的PCollections。PTransform可以执行元素方面的转换,可以将多个元素分组/聚合在一起,也可以是其他PTransform的组合,如图2-2所示。
Figure 2-2. Types of transformations
对于我们的示例,我们通常认为我们从预装PCollection < KV <Team,Integer> >命名为:“输入”(也就是说,PCollection键/值对组成的Team和Integer,在团队只是类似字符串代表Team的名字,和整数的分数从任何个人相应的团队)。在实际的管道中,我们可以通过从I/O源读取原始数据(例如日志记录)的PCollection<String>来获取输入,然后将其转换为PCollection<KV<Team,Integer>>,将日志记录解析为适当的键/值对。为了清晰起见,在第一个示例中,我为所有这些步骤都包含伪代码,但是在后面的示例中,我省略了I/O和解析。
因此,对于只从I/O源读入数据、解析Team/Score键值对并计算每个团队的得分和的管道,我们将得到类似于示例2-1所示的内容。
Example 2-1. Summation pipeline
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input.apply(Sum.integersPerKey());
键/值数据从I/O源读取,以Team(例如,团队名称的字符串)为键,Integer(例如,单个团队成员的得分)为值。然后将每个键的值汇总在一起,在输出集合中生成每个键的和(例如,团队总分)。
对于接下来的所有示例,在看到描述我们正在分析的管道的代码片段之后,我们将查看一个延时图,该图显示了针对单个键的管道在具体数据集上的执行情况。在实际的管道中,您可以想象类似的操作将在多台机器上并行进行,但是为了便于我们的示例,将事情保持简单会更清楚。
如前所述,Safari版本将整个执行过程呈现为动画电影,而打印和所有其他数字格式使用一组关键帧的静态序列,这些帧提供了管道随时间推移的进展情况。在这两种情况下,我们还在www.streamingbook.net上提供了一个指向完全动画版本的URL。
每个图都绘制了跨两个维度的输入和输出:事件时间(在x轴上)和处理时间(在y轴上)。因此,管道所观察到的实时从底部到顶部进行,由处理时间轴上随着时间的推移而上升的粗水平黑线表示。输入是圆圈,圆圈内的数字表示特定记录的值。它们一开始是浅灰色的,然后随着管道的观察而变暗。
当管道观察值时,它将值累积到中间状态,并最终将汇总结果物化为输出。状态和输出由矩形表示(灰色表示状态,蓝色表示输出),聚合值位于顶部附近,矩形覆盖的区域表示累积到结果中的事件时间和处理时间的部分。对于例2-1中的管道,在典型的批处理引擎上执行时,它将类似于图2-3所示。
image.pngFigure 2-3. Classic batch processing
因为这是一个批处理管道,所以它累积状态,直到看到所有输入(由顶部的绿色虚线表示),然后生成48个输出。在这个例子中,我们计算所有事件时间的和因为我们没有应用任何特定的窗口转换;因此,状态和输出的矩形覆盖整个x轴。然而,如果我们想要处理一个无界数据源,经典的批处理是不够的;我们不能等待输入结束,因为它实际上永远不会结束。我们想要的概念之一是窗口,我们在第1章中介绍过。因此,在我们第二个问题的上下文中——“事件时间在哪里计算结果?”——我们现在简要回顾一下windowing。
Where: 窗口
正如在第1章中讨论的,窗口是沿着时间边界分割数据源的过程。常见的窗口策略包括固定窗口、滑动窗口和会话窗口,如图2-4所示。
image.png
图2-4。例如窗口策略,每个示例显示三个不同的键,突出显示对齐窗口(适用于所有数据)和未对齐窗口(适用于数据子集)之间的差异。
为了更好地理解窗口在实际中是什么样子,让我们使用整数求和管道并将其窗口化为固定的两分钟窗口。使用Beam,更改只是简单地添加了一个Window.into转换,您可以在示例2-2中看到它的高亮显示。
Example 2-2. Windowed summation code
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey());
回想一下,Beam提供了一个在批处理和流处理中都能工作的统一模型,因为从语义上讲,批处理实际上只是流处理的一个子集。因此,让我们首先在批处理引擎上执行这个管道;它的机制更直接,当我们切换到流引擎时,它会给我们一些可以直接比较的东西。图2-5给出了结果。
和以前一样,输入是在状态下累积的,直到全部消耗完,然后才产生输出。然而,在本例中,我们得到的不是一个输出,而是四个:一个单独的输出,对应四个相关的两分钟EventTime窗口。
至此,我们已经回顾了我在第1章中介绍的两个主要概念:事件时间域和处理时间域之间的关系,以及窗口。如果我们想更进一步,我们需要开始添加本节开头提到的新概念:触发器、水印和累加。
流媒体:When and How
我们刚刚观察到在批处理引擎上执行带窗口的管道。但是,理想情况下,我们希望为结果提供更低的延迟,并且我们还希望本机处理无限制的数据源。切换到流引擎是朝着正确的方向迈出的一步,但是我们以前的策略是等到我们的输入全部被消耗掉才生成输出,这种策略现在已经不可行了。
When:触发器的奇妙之处在于触发器是奇妙的东西!
触发器为以下问题提供了答案:“什么时候在处理时间中实现结果?”触发器声明窗口的输出什么时候应该在处理时间内发生(尽管触发器本身可能基于其他时间域中发生的事情做出这些决定,比如事件-时间域中正在进行的水印,稍后我们将看到)。窗口的每个特定输出都称为窗口的窗格。
尽管可以想象有相当广泛的触发语义,但从概念上讲,通常只有两种有用的触发类型,而实际应用中几乎总是使用其中一种或两者的组合来简化:
重复更新触发器:
随着窗口内容的发展,它们定期为窗口生成更新的窗格。这些更新可以随每条新记录一起具体化,也可以在某些处理时间延迟(比如每分钟一次)之后发生。为重复更新触发器选择周期主要是为了平衡延迟和成本。
完整性触发器:
只有当窗口的输入被认为已完成到某个阈值时,才会物化窗口的窗格。这种类型的触发器最类似于我们在批处理中所熟悉的触发器:只有在输入完成之后,我们才提供结果。基于触发器的方法的不同之处在于,完整性的概念的范围仅限于单个窗口的上下文,而不是总是绑定到整个输入的完整性。
重复更新触发器是流系统中最常见的触发器类型。它们易于实现和理解,并且为特定类型的用例提供了有用的语义:对物化数据集的重复(最终是一致的)更新,类似于数据库世界中物化视图的语义。
不太经常遇到完整性触发器,但是提供了与经典批处理世界中的语义更接近的流语义。它们还提供了一些工具,用于对丢失的数据和延迟的数据进行推理,我们将在探索驱动完整性触发器的底层原语(即水印)时简要讨论这两个问题(下一章)。
但首先,让我们从简单的开始,看看一些基本的重复更新触发器。为了使触发器的概念更具体一些,让我们继续向示例管道中添加最直接的触发器类型:一个触发每个新记录的触发器,如示例2-3所示。
Example 2-3. Triggering repeatedly with every record
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AfterCount(1)))); .apply(Sum.integersPerKey());
如果我们要在流引擎上运行这个新管道,结果将如图2-6所示。
您可以看到我们现在如何为每个窗口获得多个输出(窗格):每个对应的输入一次。当将输出流写入某种表时,这种触发模式工作得很好,您可以简单地轮询结果。在查看表的任何时候,您都将看到给定窗口的最新值,这些值将随着时间的推移向正确性收敛。
每条记录触发的一个缺点是它非常健谈。在处理大规模数据时,像sum这样的聚合提供了一个很好的机会,可以在不丢失信息的情况下减少流的基数。这对于具有高音量键的情况尤其明显;例如,拥有大量活跃玩家的大型团队。想象一下,在一款大型多人游戏中,玩家被分成两个阵营,你想要统计每个阵营的统计数据。这可能是不必要的更新你的每一个新的输入记录为每个玩家在一个给定的派别。相反,您可能很乐意在处理时间延迟(比如每秒钟或每分钟)之后更新它们。使用处理时间延迟的一个很好的副作用是,它在大容量键或窗口之间具有均衡效果:最终得到的流在基数方面更加一致。
处理触发器中的时间延迟有两种不同的方法:对齐延迟(延迟将处理时间分割成跨键和窗口对齐的固定区域)和未对齐延迟(延迟相对于给定窗口中观察到的数据)。具有未对齐延迟的管道可能类似于例2-4,其结果如图2-7所示。
Example 2-4. Triggering on aligned two-minute processing-time boundaries PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AlignedDelay(TWO_MINUTES))) .apply(Sum.integersPerKey());
这种对齐的延迟触发器可以有效地从类似Spark流的微批处理流系统中获得。它的好处是可预测性;您可以同时在所有修改过的窗口中获得定期更新。这也是缺点:所有更新都同时发生,这会导致繁重的工作负载,通常需要更大的峰值供应才能正确处理负载。另一种方法是使用未对齐的延迟。这看起来像例子2-5。图2-8给出了结果。
Example 2-5. Triggering on unaligned two-minute processing-time boundaries PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(UnalignedDelay(TWO_MINUTES)) .apply(Sum.integersPerKey());
将图2-8中的未对齐延迟与图2-6中的未对齐延迟进行对比,很容易看出未对齐延迟如何在时间上更均匀地分布负载。任何给定窗口所涉及的实际延迟在这两种情况下都有所不同,有时更多,有时更少,但最终平均延迟基本上保持不变。从这个角度来看,未对齐延迟通常是大规模处理的较好选择,因为它们会导致随着时间的推移,负载分布更加均匀。
重复更新触发器对于我们只是想要随着时间的推移对结果进行周期性更新的用例非常有用,并且对于那些在没有明确指示何时实现正确性的情况下向正确性靠拢的更新也很好。然而,正如我们在第一章中讨论的,变幻莫测的分布式系统之间往往会导致不同程度的倾斜时间事件发生,它实际上观察到你的管道,这意味着它可能很难思考当你的输出提供了一个准确、完整的输入数据。对于输入完整性很重要的情况,重要的是要有某种关于完整性的推理方法,而不是盲目相信数据子集计算的结果。
When:水印(Watermarks)
水印是对以下问题的回答的一个支持方面:“在处理时间中什么时候实现结果?”水印是事件-时间域中输入完整性的时间概念。换句话说,它们是系统相对于事件流中正在处理的记录的事件时间度量进度和完整性的方法(有界或无界,尽管它们在无界情况下更有用)。
回想一下第1章中的这个图,在图2-9中稍作了修改,其中我将事件时间和处理时间之间的偏差描述为大多数实际分布式数据处理系统中时间的一个不断变化的函数。
image.png
我说的那条代表现实的弯弯曲曲的红线本质上就是水印;它捕获随着处理时间的进展而发生的事件时间完整性的进展。从概念上讲,可以将水印看作一个函数F(P)→E,它在处理时间上取一个点,在事件时间上返回一个点。事件时间E点是系统认为所有事件时间小于E的输入都被观测到的点。换句话说,这是一个断言,即再也不会看到事件时间小于E的数据。根据水印的类型,完美或启发式,断言可以是一个严格的保证或一个有根据的猜测,分别:
完美的水印
当我们对所有的输入数据都了如指掌时,就有可能构造出完美的水印。在这种情况下,不存在迟来的数据;所有数据都是提前或准时的。
启发式水印
对于许多分布式输入源,完全了解输入数据是不切实际的,在这种情况下,下一个最佳选择是提供启发式水印。启发式水印使用关于输入的所有可用信息(分区、分区内的排序(如果有的话)、文件增长率等)来提供尽可能准确的进度估计。在许多情况下,这样的水印可以非常准确的预测。即便如此,启发式水印的使用意味着它有时可能是错误的,这将导致数据延迟。我们将向您展示处理后期数据的方法。
因为它们提供了与我们的输入相关的完整性概念,所以水印构成了前面提到的第二类触发器的基础:完整性触发器。水印本身是一个迷人而复杂的话题,当你在第3章看到Slava的“水印深度解析”时就会明白这一点。但是现在,让我们通过更新我们的示例管道来查看它们的实际情况,以利用基于水印的完整性触发器,如示例2-6所示。
Example 2-6. Watermark completeness trigger
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark())).apply(Sum.integersPerKey());
现在,水印的一个有趣的性质是,它们是一类函数,这意味着有多个不同的函数F(P)→E,满足水印的属性,在不同程度上成功。正如我前面提到的,对于您完全了解输入数据的情况,可以构建一个完美的水印,这是理想的情况。但是,如果您对输入缺乏完整的知识,或者计算完美水印的计算成本太高,那么您可以选择使用启发式来定义您的水印。我想在这里指出的一点是,给定的水印算法使用是独立于管道本身。我们不打算详细讨论实现水印在这里意味着什么。现在,帮助开车回家这个想法,一个给定的输入集可以有不同的水印应用于它,让我们看一下管道在示例2 - 6当上执行相同的数据集,但使用两个不同的水印实现(图2 - 10):在左边,一个完美的水印;在右边,一个启发式水印。
在这两种情况下,当水印通过窗口的末端时,窗口都会物化。正如您所期望的那样,完美的水印可以随着时间的推移完美地捕获管道的事件时间完整性。相比之下,具体算法用于右边的启发式水印未能考虑到9的价值,这大大改变了形状的物化输出,无论是输出延迟和正确性(在错误的答案提供的5(12:00,12:02)窗口)。
图2-9中的水印触发器与图2-5到图2-7中的重复更新触发器之间的主要区别在于,水印为我们提供了一种方法来推断输入的完整性。在系统实现给定窗口的输出之前,我们知道系统还不相信输入是完整的。这对于您希望推断输入中缺少数据或数据丢失的用例尤其重要。
丢失数据用例的一个很好的例子是外连接。如果没有完整性(如水印)的概念,您如何知道何时放弃并释放出部分连接,而不是继续等待连接完成?你不。并且决定处理时间延迟,这是常见的方法在缺乏真正的水印支持流媒体系统,不是一个安全的路要走,因为事件时间倾斜的变量性质我们谈到在第1章:只要斜仍然小于选择processingtime延迟,你缺失的数据结果是正确的,但除此之外,任何时候倾斜生长延迟,他们会突然变得不正确的。从这个角度来看,事件时间水印是许多真实流用例的关键部分,这些用例必须考虑输入中缺少数据的原因,比如外部连接、异常检测等等。
现在,说了这么多,这些水印的例子也突出了两个缺点的水印(和任何其他的完整性的概念),特别是他们可以是下列之一:
太慢
当任何类型的水印由于已知的未处理数据而正确延迟时(例如,由于网络带宽限制而缓慢增长的输入日志),如果水印的提前是您唯一依赖的刺激结果,则直接转换为输出延迟。
这在图2-10的左图中最为明显,对于图2-10,延迟到达的9保留了所有后续窗口的水印,即使这些窗口的输入数据提前完成。这对于第二个窗口([12:02,12:04])尤其明显,因为从窗口中的第一个值出现到看到窗口的任何结果都需要将近7分钟。本例中的启发式水印不会出现同样严重的问题(在输出前5分钟),但这并不意味着启发式水印不会出现水印延迟;这实际上只是我在这个特定示例中选择从启发式水印中删除的记录的结果。
这里的要点如下:虽然水印提供了一个非常有用的完整性概念,但是从延迟的角度来看,依赖完整性来产生输出通常是不理想的。想象一个包含有价值的度量指标的仪表板,按小时或天开窗。你不太可能要等上一个小时或者一天才能看到当前窗口的结果;这是使用经典批处理系统来为此类系统提供动力的难点之一。相反,随着时间的推移,随着输入的发展和最终的完成,看到这些窗口的结果变得更完善会更好。
太快
当启发式水印提前的时间不正确时,在水印之前具有事件时间的数据可能会延迟一段时间到达,从而创建延迟的数据。这是在右边的例子中所发生的:在观察到第一个窗口的所有输入数据之前,水印已经提前超过了第一个窗口的末端,导致错误的输出值为5而不是14。这个缺点是一个严格的启发式水印问题;它们的启发式本质意味着它们有时会出错。因此,如果您关心正确性,仅依靠它们来确定何时物化输出是不够的。
在第1章中,我对完整性的概念做了一些强调,对于大多数需要对无界数据流进行健壮的out-of-order处理的用例来说,完整性是不够的。这两个缺点——水印太慢或太快——是这些论点的基础。对于一个仅仅依赖完整性概念的系统,您无法同时获得低延迟和正确性。所以,对于你想要两全其美的情况,一个人该怎么做呢?嗯,如果重复更新触发器提供低延迟更新,但无法推断完整性,而水印提供了完整性的概念,但可变和可能的高延迟,为什么不把它们的功能结合在一起呢?
When: Early/On-Time/Late Triggers FTW!
我们已经了解了触发器的两种主要类型:重复更新触发器和完整性/水印触发器。在许多情况下,它们单独都是不够的,但它们结合在一起才是。Beam通过提供标准水印触发器的扩展来识别这一事实,该扩展还支持在水印的任意一侧进行重复更新触发。这就是所谓的早期/准时/延迟触发器,因为它将由复合触发器实现的窗格分为三类:
-
零个或多个早期窗格,这些窗格是重复更新触发器的结果,该触发器定期触发,直到水印通过窗口的末端。这些触发产生的窗格包含推测性结果,但允许我们观察新输入数据到达时窗口随时间的演变。这弥补了有时太慢的水印的缺点。
-
一个单独的实时窗格,它是在水印经过窗口末端后,完整性/水印触发触发的结果。这个触发是特殊的,因为它提供了一个断言,系统现在相信这个窗口的输入是完整的。这意味着现在可以对丢失的数据进行推理;例如,在执行外部连接时发出部分连接。
-
零个或多个延迟窗格,它们是另一个(可能不同的)重复更新触发器的结果,该触发器在水印经过窗口末端后,每当延迟数据到达时,都会周期性地触发该触发器。在一个完美的水印的情况下,总是会有零后期窗格。但在启发式水印的情况下,任何数据的水印没有适当的帐户将导致后期的激发。这弥补了水印速度过快的缺点。
让我们看看它是如何运作的。我们将更新我们的管道,以使用一个定期处理时间触发器,对早期触发使用一分钟的对齐延迟,对后期触发使用每个记录触发。这样,早期的发射会给我们一些批处理大容量windows(由于触发的每分钟只执行一次,不管吞吐量到窗口),但我们不会解雇后期引入不必要的延迟,这是罕见的,如果我们希望使用一个合理准确的启发式水印。在Beam中,这看起来像是例2-7(图2-11显示了结果)。
Example 2-7. Early, on-time, and late firings via the early/on-time/late API PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AfterCount(1)))) .apply(Sum.integersPerKey());
这个版本比图2-9有两个明显的改进:
对于第二个窗口中的“水印太慢”情况,[12:02,12:04]:我们现在每分钟提供一次定期的早期更新。这种差异在完美水印的情况下表现得最为明显,从第一次输出到输出的时间从将近7分钟减少到3.5分钟;但在启发式的情况下,它也得到了明显的改善。现在,两个版本都提供了随时间的稳定细化(值分别为7、10和18的窗格),从输入完成到窗口的最终输出窗格物化之间的延迟相对最小。
对于第一个窗口中的“启发式水印太快”情况,[12:00,12:02]:当9的值出现得太晚时,我们立即将其合并到一个新的、经过修正的值为14的窗格中。
这些新触发器的一个有趣的副作用是,它们有效地规范化了完美和启发式水印版本之间的输出模式。尽管图2-10中的两个版本完全不同,但是这里的两个版本看起来非常相似。它们看起来也更类似于图2-6到图2-8中的各种重复更新版本,但有一个重要的区别:由于使用了水印触发器,我们还可以推断使用早期/按时/后期触发器生成的结果的输入完整性。这使我们能够更好地处理关心丢失数据的用例,比如外部连接、异常检测等等。
在这一点上,完美和启发式的早期/准时/后期版本之间最大的区别是窗口生存期界限。在完美的水印情况下,我们知道在一个窗口的水印结束后,我们将永远不会看到更多的数据,因此我们可以放弃我们的所有状态的窗口。在启发式水印的情况下,我们仍然需要在一段时间内保持窗口的状态,以解释延迟的数据。但是到目前为止,我们的系统还没有任何好的方法来知道每个窗口需要保持多长时间的状态。这就是允许迟到的原因。
When: Allowed Lateness (i.e., Garbage Collection)
在继续我们的最后一个问题(“结果的细化是如何关联的?”)之前,我想先谈谈长期存在的、out-of-order流处理系统中的一个实际需求:垃圾收集。在图2-11的启发式水印示例中,每个窗口的持久状态在示例的整个生命周期中一直存在;这是必要的,使我们能够适当地处理迟到的数据,当/如果他们到达。但是,虽然能够将所有的持久状态保留到时间结束是很好的,但实际上,在处理一个无限的数据源时,为给定的窗口无限期地保留状态(包括元数据)通常是不实际的;我们最终将耗尽磁盘空间(或者至少厌倦了为此买单,因为旧数据的价值会随着时间的推移而减少)。
因此,任何实际的无序处理系统都需要提供某种方法来限制其处理的窗口的生存期。一种简洁明了的方法是在系统中定义允许的延迟;也就是说,对任何给定的记录(相对于水印)的延迟时间设限,以便系统对其进行处理;任何在这个视界之后到达的数据都将被删除。在确定了各个数据的延迟时间之后,您还精确地确定了窗口的状态必须保持多长时间:直到水印超过窗口末端的延迟时间。但除此之外,您还赋予了系统在观察到任何数据之后立即删除它们的自由,这意味着系统不会浪费资源来处理没有人关心的数据。
测量迟到
在处理后期数据时,首先使用产生后期数据的度量标准(即,启发式水印)。在某种意义上确实如此。但在可用的选择中,它可以说是最好的。唯一可行的选择是指定的地平线处理时间(例如,windows大约10分钟的处理时间水印过后的窗口),但是使用处理时间将管道内的垃圾收集策略容易问题本身(例如,工人崩溃,导致管道拖延几分钟),这可能导致windows实际上没有机会处理它们本来应该处理的延迟数据。通过指定事件-时间域的范围,垃圾收集与管道的实际进度直接相关,这减少了窗口错过适当处理后期数据的机会的可能性。
但是请注意,并不是所有的水印都是相同的。当我们在这本书中谈到水印时,我们通常指的是低水印,它悲观地试图捕捉系统意识到的最古老的未处理记录的事件时间。通过低水印处理延迟的好处是,它们对事件时间倾斜的变化具有弹性;无论管道中的偏移有多大,低水印总是跟踪系统已知的最老的未完成事件,从而提供可能的最佳正确性保证。
相反,一些系统可能使用术语“水印”来表示其他东西。例如,Spark Structured Streaming中的水印是高水印,它乐观地跟踪系统所知道的最新记录的事件时间。在处理延迟时,系统可以自由地对任何比用户指定的延迟阈值调整的高水印时间更早的窗口进行垃圾收集。换句话说,系统允许您指定您希望在管道中看到的最大事件时间偏移量,然后丢弃该偏移窗口之外的任何数据。如果管道内的歪斜保持在某个恒定的增量内,这可以很好地工作,但是与低水印方案相比,更容易错误地丢弃数据。
因为允许的延迟和水印之间的交互有点微妙,所以值得看一个例子。让我们以例2-7/图2-11中的启发式水印管道为例,并在例2-8中添加1分钟的延迟范围(请注意,选择这个特定的范围是严格的,因为它很好地适合于图;对于真实的用例,更大的范围可能更实际):
Example 2-8. Early/on-time/late firings with allowed lateness
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)).triggering( AfterWatermark().withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AfterCount(1))).withAllowedLateness(ONE_MINUTE)) .apply(Sum.integersPerKey());
该管道的执行将类似于图2-12,其中我添加了以下特性来突出显示允许延迟的影响:
-
表示处理时间中当前位置的粗黑线现在用标记标注所有活动窗口的延迟时间(事件时间)。
-
当水印通过窗口的延迟范围时,该窗口将关闭,这意味着该窗口的所有状态都将被丢弃。我在周围留了一个虚线矩形,显示窗口关闭时所覆盖的时间范围(在两个域中),并向右延伸一个小尾巴,表示窗口的延迟范围(用于与水印进行对比)。
-
仅对于此关系图,我为第一个窗口添加了一个附加的延迟数据,其值为6。6虽然晚了,但仍然在允许的延迟范围内,因此被合并到值为11的更新结果中。然而,9,到达了迟到的地平线之外,所以它被简单地丢弃了。
关于迟到的最后两个补充说明:
绝对清楚的是,如果您使用的数据恰好来自具有完美水印的源,则不需要处理延迟数据,允许延迟的时间范围为零秒将是最优的。这是我们在图2-10的完美水印部分看到的。
一个值得注意的例外的规则需要指定迟到的视野,即使使用启发式水印,就像全球总量计算时间温顺地有限数量的键(例如,计算访问你的站点的总数,分组通过web浏览器家族)。在这种情况下,系统中活动窗口的数量受到正在使用的有限键空间的限制。只要键的数量保持在可管理的低水平,就没有必要担心通过允许延迟来限制窗口的生存期。
实用性讲完了,我们接着讲第四个也是最后一个问题。
How: Accumulation
当触发器被用来在一段时间内为一个窗口生成多个窗格时,我们发现自己面临最后一个问题:“结果的细化如何相关?”“在我们目前看到的例子中,每个连续的窗格都是建立在它前面的窗格上的。然而,实际上有三种不同的积累模式:
Discarding(丢弃)
每当一个窗格物化时,任何存储状态都会被丢弃。这意味着每个连续的窗格与之前的窗格是独立的。当下游消费者自身进行某种积累时,丢弃模式是有用的;例如,当向系统发送整数时,系统期望接收到它将累加在一起以产生最终计数的增量。
Accumulating(积累)
如图2-6至图2-11所示,每次具体化一个窗格时,都会保留任何存储状态,并将未来的输入累积到现有状态。这意味着每个连续的窗格都建立在前一个窗格的基础上。当以后的结果可以简单地覆盖以前的结果时,例如在HBase或Bigtable之类的键/值存储中存储输出时,积累模式非常有用。
Accumulating and retracting(积累和收回)
这类似于积累模式,但是在生成新窗格时,它还会为前一个窗格生成独立的回缩。撤销(结合新的累积结果)本质上是一种明确的方式,表示“我之前告诉过您结果是X,但是我错了。去掉我上次告诉你的X,代入Y。“在两种情况下,撤销决定特别有用:
-
当下游的消费者按照不同的维度重新分组数据时,新值的键值可能完全不同于以前的值,因此最终可能在不同的组中。在这种情况下,新值不能覆盖旧值;相反,您需要回缩以删除旧值
-
当动态窗口(例如,会话,我们稍后将更详细地讨论)正在使用时,由于窗口合并,新值可能会替换多个以前的窗口。在这种情况下,仅从新窗口很难确定哪些旧窗口正在被替换。对旧的窗口进行显式的回退可以使任务变得简单。我们将在第8章中详细看到一个这样的例子。
当看到侧栏时,每个组的不同语义会更清晰一些。考虑图2-11中的第二个窗口(具有eventtime范围[12:06,12:08)的两个窗格(具有early/on-time/late触发器的窗格)。表2-1显示了三个累加模式下每个窗格的值(其中累加模式是图2-11中使用的特定模式)。
表2-1。使用图2-11中的第二个窗口比较累积模式
image.png image.png让我们仔细看看发生了什么:
Discarding(丢弃)
每个窗格只合并在该特定窗格期间到达的值。因此,观察到的最终值并不能完全捕捉到总数。然而,如果你把所有独立的窗格加起来,你会得到12的正确答案。这就是为什么当下游消费者本身在物化窗格上执行某种聚合时,丢弃模式是有用的。
Accumulating(积累)
如图2-11所示,每个窗格合并了在特定窗格期间到达的值,以及来自以前窗格的所有值。因此,观察到的最终值正确地捕获了12个值的总和。但是,如果您要对各个窗格本身进行汇总,那么您实际上是在重复计算窗格1的输入,从而得出错误的总数15。这就是为什么当您可以简单地用新值覆盖以前的值时,积累模式是最有用的:新值已经合并了到目前为止看到的所有数据。
Accumulating and retracting(积累和收回)
每个窗格既包含新的累积模式值,也包含前一个窗格值的回缩。因此,最后观察到的值(不包括回撤)以及所有物化窗格的总和(包括回撤)为您提供12的正确答案。这就是为什么回撤如此强大的原因。
例2-9演示了丢弃模式的操作,说明了我们将对例2-7所做的更改:
例子2-9,提前/准时/延迟触发的丢弃模式版本
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AtCount(1)))
.discardingFiredPanes()).apply(Sum.integersPerKey());
在带有启发式水印的流引擎上再次运行将产生如图2-13所示的输出。
图2-13。丢弃流媒体引擎上early/on-time/late触发的模式版本
尽管输出的整体形状类似于图2-11中的累积模式版本,但是请注意,在这个丢弃的版本中,没有窗格重叠。因此,每个输出都是独立于其他输出的。
如果我们想要查看实际的回调,变化将是类似的,如例2-10所示。? ? ?描述了结果。
例子2-10。early/on-time/late触发的累积和撤回模式版本
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark().withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AtCount(1))).accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
积累和撤回模式版本的早/晚点火流引擎
因为每个窗口的窗格都是重叠的,所以很难清楚地看到回缩。收缩用红色表示,红色与重叠的蓝色窗格相结合,产生一种略带紫色的颜色。我还对给定窗格中的两个输出值进行了轻微的水平移动(并使用逗号分隔它们),以使它们更容易区分。
图2-14结合了图2-9、图2-11的最后几帧(仅为启发式),并排显示了三种模式的良好视觉对比。
正如您可以想象的那样,在存储和计算成本方面,所呈现的顺序(丢弃、积累、积累和回收)中的模式依次更加昂贵。为此,积累模式的选择提供了在正确性、延迟和成本轴上进行权衡的另一个维度。
总结
完成这一章之后,您现在已经了解了健壮流处理的基础知识,并准备好进入这个世界,做一些令人惊奇的事情。当然,还有8个章节在焦急地等待着你们的注意,所以希望你们不要像现在这样,就在这一分钟。但无论如何,让我们来回顾一下我们刚刚谈到的内容,以免你在匆忙中忘记其中的任何内容。一是涉及的主要概念:
事件时间与处理时间
事件发生的时间与数据处理系统观察到的时间之间的重要区别。
Windowing (窗口)
通常使用的管理无界数据的方法是沿着时间边界(在处理时间或事件时间内,尽管我们将Beam模型中窗口的定义缩小为仅在事件时间内)进行分割。
Triggers(触发器)
用于精确指定输出物化何时对特定用例有意义的声明性机制。
Watermarks(水印)
事件时间进展的强大概念,为无序处理系统中对无界数据进行操作的完整性(从而丢失数据)的推理提供了一种方法。
Accumulation (累加器)
单个窗口的结果细化之间的关系,在这种情况下,随着它的发展,它会被多次具体化。
第二,我们用来构建我们的探索的四个问题:
计算了什么结果?=转换。(What)
事件时间在哪里计算结果?=窗口。(Where)
什么时候在处理时间结果是物化?=触发器加水印。(When)
结果的细化是如何关联的?=积累。(How)
第三,开车回家这个模型提供的灵活性的流处理(因为最后,这就是这就是:平衡竞争紧张正确性、延迟和成本),回顾的主要输出的变化我们可以实现在同一数据集,只有少量的代码变化:
image.png综上所述,在这一点上,我们实际上只研究了一种类型的窗口:事件时间中的固定窗口。正如我们所知,窗口设计有很多维度,在我们使用Beam模型之前,我想至少再讲两个维度。然而,首先,我们要稍微绕个弯,深入研究一下水印的世界,因为这些知识将有助于构建未来的讨论框架(而且本身也很吸引人)。进入Slava, stage right…