【Data Flow】从论文开始看起(The Dataflow
正文之前
前阵子去找老板的时候。老板说跟我让我好好的看一下数据流方面的东西,明年初的时候会有一个国家重点专项需要我参与进去。好吧,那就看喽~不过听说如果要博士毕业,英文水平必须能够流畅的阅读英文文献。那我今天就开始喽
正文
The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
数据流模型:一种在大规模、无边界、无序的数据处理过程中平衡正确性、延迟、成本的实用方法。
ABSTRACT【摘要】
Unbounded, unordered, global-scale datasets are increasingly common in day-to-day business (e.g. Web logs, mobile usage statistics, and sensor networks). At the same time, consumers of these datasets have evolved sophisticated requirements, such as event-time ordering and windowing by features of the data themselves, in addition to an insatiable hunger for faster answers. Meanwhile, practicality dictates that one can never fully optimize along all dimensions of correctness, latency, and cost for these types of input. As a result, data processing practitioners are left with the quandary of how to reconcile the tensions between these seemingly competing propositions, often resulting in disparate implementations and systems.
无界的、无序的、全球范围内的数据集在日常的业务[商业活动?]中愈发常见(比如网站日志、手机使用数据,传感器组成的网络等)。与此同时,这些数据集的消费者已经形成了复杂的需求,除了对更快的得出结果的无限渴望之外,还有事件时间排序[实时性?]以及根据数据本身的特征进行窗口化处理等需求。同时,从实用性来看,我们永远也无法对各种不同类型的输入进行正确性、延迟、开销上的完美优化。这就导致了,数据处理从业者面临着如何协调这些看起来相互冲突的命题之间的紧张局势的困境,而这时常导致了不同的数据处理实现方法与系统。
We propose that a fundamental shift of approach is necessary to deal with these evolved requirements in modern data processing. We as a field must stop trying to groom unbounded datasets into finite pools of information that eventually become complete, and instead live and breathe under the assumption that we will never know if or when we have seen all of our data, only that new data will arrive, old data may be retracted, and the only way to make this problem tractable is via principled abstractions that allow the prac- titioner the choice of appropriate tradeoffs along the axes of interest: correctness, latency, and cost.
我们认为:在现代的数据处理过程中,处理这些不断改变的需求是我们必须要面对的转变。作为一个领域,我们必须要停止清洗[切分]无界数据,等待其变成一个有限的信息集合的这种行为。取而代之的,要假设我们不知道数据流何时终结,只知道新的数据不断涌入、旧的数据可能会被撤销或者删除。唯一解决这个问题的方案就是:通过原则性抽象,允许数据处理从业者在正确性、延迟、开销三个方向上进行合适的权衡。
In this paper, we present one such approach, the Dataflow Model1, along with a detailed examination of the semantics it enables, an overview of the core principles that guided its design, and a validation of the model itself via the real-world experiences that led to its development.
在本文中,我们提出了这样一种办法,数据流模型1,以及它所支持的语义的细节检查、引导设计出这一模型的核心原则和在这一模型的实际开发过程中对它的验证。
1. INTRODUCTION【引言】
Modern data processing is a complex and exciting field. From the scale enabled by MapReduce [16] and its successors (e.g Hadoop [4], Pig [18], Hive [29], Spark [33]), to the vast body of work on streaming within the SQL community (e.g. query systems [1, 14, 15], windowing [22], data streams [24], time domains [28], semantic models [9]), to the more recent forays in low-latency processing such as Spark Streaming [34], MillWheel, and Storm [5], modern consumers of data wield remarkable amounts of power in shaping and taming massive-scale disorder into organized structures with far greater value. Yet, existing models and systems still fall short in a number of common use cases.
现代数据处理是一个复杂而又令人兴奋的领域。MapReduce和它的接班人们(Hadoop,Pig,Hive,Spark)组成的系统解决了数据规模上的问题,SQL社区也为此做出了很多贡献(如查询系统,窗口,数据流,时间维度,语义模型等)。 以及低延迟处理上Spark Streaming, MillWheel, Storm等做了很多尝试。这些使得现代数据工作者拥有了很多强有力的可以塑造并且加工巨量无序数据成为拥有更大价值的结构化数据的工具。但是现有的模型和系统仍然远远无法处理很多常见的场景。
Consider an initial example: a streaming video provider wants to monetize their content by displaying video ads and billing advertisers for the amount of advertising watched. The platform supports online and offline views for content and ads. The video provider wants to know how much to bill each advertiser each day, as well as aggregate statistics about the videos and ads. In addition, they want to efficiently run offline experiments over large swaths of historical data.
考虑到一个例子:一家流媒体视频播放提供商想要播放广告,并且收取广告商费用从而变现获利。这个平台支持在线以及离线播放内容和广告,视频提供商想要知道应该向广告商收取多少费用以及一个总体上的视频和广告的统计数据。另外他们还想要更有效地在大量的历史离线数据上进行历史数据分析和各种实验。
Advertisers/content providers want to know how often and for how long their videos are being watched, with which content/ads, and by which demographic groups. They also want to know how much they are being charged/paid. They want all of this information as quickly as possible, so that they can adjust budgets and bids, change targeting, tweak campaigns, and plan future directions in as close to real time as possible. Since money is involved, correctness is paramount.
广告商/视频提供商都想知道人们观看视频和广告的频率以及时长,以及不同的人群更偏好于哪一类视频以及广告。他们也想知道被收取/付费了多少?他们想要尽可能快地获取所有信息,以致于他们可以及时地调整预算,调整定价,改变目标,调整广告系列以及规划未来方向。因为涉及到了金钱,所以正确性至关重要!
Though data processing systems are complex by nature, the video provider wants a programming model that is simple and flexible. And finally, since the Internet has so greatly expanded the reach of any business that can be parceled along its backbone, they also require a system that can handle the diaspora of global scale data.
尽管数据处理系统本质上极其复杂,但是视频提供商只想要一个简单而又灵活的编程模型。最终由于他们基于互联网的业务已经沿着骨干扩散至各个行业,所以他们最终还需要一个能处理全球离散数据的系统。
The information that must be calculated for such a use case is essentially the time and length of each video viewing, who viewed it, and with which ad or content it was paired (i.e. per-user, per-video viewing sessions). Conceptually this is straightforward, yet existing models and systems all fall short of meeting the stated requirements.
每个视频的观看时间以及观看长度。还有观看者以及与其匹配的内容和广告(也就是每个用户、每一次观看会话)基本上也就是上述场景中所需要计算的信息了。从理论上来说,这很简单,但是现有的模型以及系统全都不符合这种需求。
Batch systems such as MapReduce (and its Hadoop vari- ants, including Pig and Hive), FlumeJava, and Spark suffer from the latency problems inherent with collecting all input data into a batch before processing it. For many streaming systems, it is unclear how they would remain fault-tolerant at scale (Aurora [1], TelegraphCQ [14], Niagara [15], Esper [17]). Those that provide scalability and fault-tolerance fall short on expressiveness or correctness vectors. Many lack the ability to provide exactly-once semantics (Storm, Samza [7], Pulsar [26]), impacting correctness. Others simply lack the temporal primitives necessary for windowing (Tigon [11]), or provide windowing semantics that are limited to tuple or processing-time-based windows (Spark Streaming [34], Sonora [32], Trident [5]). Most that provide event-time-based windowing either rely on ordering (SQLStream [27]), or have limited window triggering semantics in event-time mode (Stratosphere/Flink [3, 6]). CEDR [8] and Trill [13] are noteworthy in that they not only provide useful trigger- ing semantics via punctuations [30, 28], but also provide an overall incremental model that is quite similar to the one we propose here; however, their windowing semantics are insufficient to express sessions, and their periodic punctu- ations are insufficient for some of the use cases in Section 3.3. MillWheel and Spark Streaming are both sufficiently scalable, fault-tolerant, and low-latency to act as reason- able substrates, but lack high-level programming models that make calculating event-time sessions straightforward. The only scalable system we are aware of that supports a high-level notion of unaligned windows4 such as sessions is Pulsar, but that system fails to provide correctness, as noted above. Lambda Architecture [25] systems can achieve many of the desired requirements, but fail on the simplicity axis on account of having to build and maintain two systems. Summingbird [10] ameliorates this implementation complexity by abstracting the underlying batch and streaming systems behind a single interface, but in doing so imposes limitations on the types of computation that can be performed, and still requires double the operational complexity.
批处理操作系统比如MapReduce (and its Hadoop vari- ants, including Pig and Hive) , FlumeJava, and Spark 等都深受延迟问题困扰。因为批处理操作需要提前收集所有的输入数据。对于许多流媒体系统来说,目前还不知道它们在大规模的数据上的容错性如何 (Aurora [1], TelegraphCQ [14], Niagara [15], Esper [17])。而那些提供可扩展性以及容错性的系统,则无法在准确性或者正确性这些方面得到保证。还有一些系统则缺乏提供单次处理语义下保证正确性的能力(Storm, Samza [7], Pulsar [26])。至于还有其他一些系统则缺乏窗口化所需要的时间原语,或者是仅仅能提供基于记录数元组或者基于处理时间的窗口 (Spark Streaming [34], Sonora [32], Trident [5])。大部分基于事件时间的窗口同时也依赖于数据有序,或者在基于事件时间模式下,对于窗口触发语义有限制 (Stratosphere/Flink [3, 6])。 CEDR [8] 和 Trill [13]是值得注意的,它们不仅可以通过标记来提供有用的触发语义,而且提供了一个与本文中提出的十分相似的整体增量模型。然而他们的窗口语义并不足以表达基于会话的窗口,而且他们的基于标记的窗口不足以满足3.3节中某些用例。MillWheel和Spark Streaming都具有足够的可扩展性,容错性和低延迟性,但是对于会话窗口缺乏一种直观的高层编程模型。我们发现只有Pulsar系统对非对齐窗口(译者注:指只有部分记录进入某一特定窗口,会话窗口就是一种非对齐窗口)提供了高层次语义抽象,但是它缺乏对数据准确性的保证。Lambda架构可以实现很多需求,但是因为要维护两个系统(译者注:指离线和在线系统),所以在简单性这一点上无法得到保证。 Summingbird [10]通过运用一个简单接口抽象潜在的批处理和流式系统(统一封装抽象)改善了Lambda实现的复杂性。但是这样做会对可执行的计算类型施加一定的限制,并将致使其需要两倍的操作复杂度。
None of these shortcomings are intractable, and systems in active development will likely overcome them in due time. But we believe a major shortcoming of all the models and systems mentioned above (with exception given to CEDR and Trill), is that they focus on input data (unbounded or otherwise) as something which will at some point become complete. We believe this approach is fundamentally flawed when the realities of today’s enormous, highly disordered datasets clash with the semantics and timeliness demanded by consumers. We also believe that any approach that is to have broad practical value across such a diverse and varied set of use cases as those that exist today (not to mention those lingering on the horizon) must provide simple, but powerful, tools for balancing the amount of correctness, latency, and cost appropriate for the specific use case at hand. Lastly, we believe it is time to move beyond the prevailing mindset of an execution engine dictating system semantics; properly designed and built batch, micro-batch, and streaming systems can all provide equal levels of correctness, and all three see widespread use in unbounded data processing today. Abstracted away beneath a model of sufficient generality and flexibility, we believe the choice of execution engine can become one based solely on the practical underlying differences between them: those of latency and resource cost.
这些缺点都不是棘手的问题。活跃开发的系统发展起来之后,会在一定的时间内克服它们。但是我们相信一个所有的模型和系统 (除了 CEDR and Trill)都会提及到的缺点是:他们都相信输入数据(无界的或者是别的)在一段时间之后会变得完整。我们相信这种方法是从根本上有瑕疵的。因为我们现在面对的是现实世界中巨大的、高度无序的数据集与消费者之间对于语义和时效性的各种需求之间的冲突。我们也相信对于今天所存在的那些多种多样的使用案例,具有广泛使用价值的方法必须提供简单但是强大的工具,来平衡正确性、延迟和针对特定的使用案例的开销。最后,我们坚信现在已经是时候去除掉现在盛行的执行引擎需要描述系统语义的这种思维了。适当的设计制造批处理、微批处理和流式系统可以提供同等层次正确性,这三种系统在当今的无界数据处理中的应用的十分广泛。如果我们能够抽象出一个具有足够通用性和灵活性的模型,那么我们相信执行引擎的选择就只是关于延迟和资源开销之间的选择。
Taken from that perspective, the conceptual contribution of this paper is a single unified model which:
从这个角度来看的话,本文的概念上的贡献就只是一个单纯的统一模型:
• Allows for the calculation of event-time ordered results, windowed by features of the data themselves, over an unbounded, unordered data source, with correctness, latency, and cost tunable across a broad spectrum of combinations.
允许在无界的无序的数据源上,对结果排序以及基于数据本身的特征窗口化计算,并且对各种不同的组合进行正确性延迟以及开销上的协调。
• Decomposes pipeline implementation across four related dimensions, providing clarity, composability, and flexibility:
– What results are being computed.
– Where in event time they are being computed.
– When in processing time they are materialized.
– How earlier results relate to later refinements.
在四个相关维度上分解管道实现,提供清晰性、可组合性和灵活性:
-
计算什么样的结果
-
按时间发生时间计算
-
在流计算处理时间时被真正除法计算
-
早期结果如何在后期被修正
• Separates the logical notion of data processing from the underlying physical implementation, allowing the choice of batch, micro-batch, or streaming engine to become one of simply correctness, latency, and cost.
将数据处理的逻辑概念与底层的物理实现分割开来。使得对批处理、微批处理和流式引擎的选择成为简单的对准确性、延迟以及开销之间的选择。
Concretely, this contribution is enabled by the following:
具体来说,这个贡献是由以下内容实现的:
• A windowing model which supports unaligned event-time windows, and a simple API for their creation and use (Section 2.2).
一个支持非对齐事件发生时间窗口模型,一组简单的窗口创建以及使用的API
• A triggering model that binds the output times of results to runtime characteristics of the pipeline, with a powerful and flexible declarative API for describing desired triggering semantics (Section 2.3).
一个根据管道运行时特征来决定结果输出次数的触发模型,一组强有力而灵活地描述触发语义的声明式API。
• An incremental processing model that integrates retractions and updates into the windowing and triggering models described above (Section 2.3).
将数据的回收以及更新整合到上述说过的窗口和触发模型中形成一个增量处理模型。
• Scalable implementations of the above atop the MillWheel streaming engine and the FlumeJava batch engine, with an external reimplementation for Google Cloud Dataflow, including an open-source SDK [19] that is runtime-agnostic (Section 3.1).
MillWheel流式引擎和FlumeJava批处理引擎上的可扩展实现,重写了Google Cloud Dataflow外部实现,并且提供了一个运行时不指定运行引擎的开源SDK。
• A set of core principles that guided the design of this model (Section 3.2).
一系列关于设计这个模型的核心原则。
• Brief discussions of our real-world experiences with massive-scale, unbounded, out-of-order data processing at Google that motivated development of this model (Section 3.3).
简要讨论Google在处理大规模无边界乱序数据流上的经验,这也是推动我们开发这个模型的动力。
It is lastly worth noting that there is nothing magical about this model. Things which are computationally impractical in existing strongly-consistent batch, micro-batch, streaming, or Lambda Architecture systems remain so, with the inherent constraints of CPU, RAM, and disk left steadfastly in place. What it does provide is a common framework that allows for the relatively simple expression of parallel computation in a way that is independent of the underlying execution engine, while also providing the ability to dial in precisely the amount of latency and correctness for any specific problem domain given the realities of the data and resources at hand. In that sense, it is a model aimed at ease of use in building practical, massive-scale data processing pipelines.
最后说一下的是这个模型没什么神奇的。在现有的高度一致的批处理、微型批处理、流处理系统,Lambda架构无法计算的东西仍然存在。CPU, RAM,磁盘的固有约束仍然限制着我们。这个模型所提供的只是一个与底层执行引擎相互独立的,表达现有并行计算的一种简单通用框架。当然也提供针对现实世界中数据和资源不同而引发的特殊情况下,精确的调整延时和正确性之间的关系的能力。从这一点上来说,这是一个致力于构建实用的大规模数据处理管道的模型。
1.1 Unbounded/Bounded vs Streaming/Batch
1.1 无界、有界 VS 流、批处理
When describing infinite/finite data sets, we prefer the terms unbounded/bounded over streaming/batch, because the latter terms carry with them an implication of the use of a specific type of execution engine. In reality, unbounded datasets have been processed using repeated runs of batch systems since their conception, and well-designed streaming systems are perfectly capable of processing bounded data. From the perspective of the model, the distinction of streaming or batch is largely irrelevant, and we thus reserve those terms exclusively for describing runtime execution engines.
刚在处理无限/有限数据的时候,我们总是更加偏爱 无界/有界 而不是 流/批处理这组词汇,因为后者总是暗示也许会要用到一种特殊的执行引擎。而在现实工作中无限数据集可以采用批处理操作重复运行调度来处理,有限数据集也可以用流操作系统来完美的处理。从本文的模型角度来看,严格区分流和批处理用处不大,我们也只是保留这一组术语来描述运行时的执行引擎。
1.2 Windowing
1.2 窗口
Windowing [22] slices up a dataset into finite chunks for processing as a group. When dealing with unbounded data, windowing is required for some operations (to delineate finite boundaries in most forms of grouping: aggregation, outer joins, time-bounded operations, etc.), and unnecessary for others (filtering, mapping, inner joins, etc.). For bounded data, windowing is essentially optional, though still a semantically useful concept in many situations (e.g. backfilling large scale updates to portions of a previously computed unbounded data source). Windowing is effectively always time based; while many systems support tuple-based windowing, this is essentially time-based windowing over a logical time domain where elements in order have successively increasing logical timestamps. Windows may be either aligned, i.e. applied across all the data for the window of time in question, or unaligned, i.e. applied across only specific subsets of the data (e.g. per key) for the given window of time. Figure 1 highlights three of the major types of windows encountered when dealing with unbounded data.
窗口是把数据集切分成有限个数据片打包成一组处理。当我们处理无限数据的时候,窗口化对于某些操作是必须的。(在大多数形式的分组中描述有限边界:汇集,外链接,时间区域内定义的操作等等),另一些则不需要这样(过滤、映射、内链接)。对于有限数据来说,窗口是可选的操作,虽然在很多情况下这仍然是一种十分有用的语义概念。(如回填一大批的更新数据到之前读取无边界数据源处理过的数据 译者注:类似于Lambda架构)。窗口基本上都是基于时间的。不过也有一些系统是基于记录数的窗口,这种窗口也可以认为是基于一个逻辑上的时间域,其中的元素基于时间戳顺序递增。窗口是对齐的,应用于被窗口覆盖的所有时间内的数据。也可以是非对称的,仅仅应用于规定时间内特殊的子集(比如按照键值来选择)。图一列出了三种常见的处理无边界数据时的窗口。
image.pngFixed windows (sometimes called tumbling windows) are defined by a static window size, e.g. hourly windows or daily windows. They are generally aligned, i.e. every window applies across all of the data for the corresponding period of time. For the sake of spreading window completion load evenly across time, they are sometimes unaligned by phase shifting the windows for each key by some random value.
固定窗口(有时称为翻滚窗口)由一个静态的窗口尺寸定义。比如说小时或者天窗口。他们通常都是对齐的,比如说每一个窗口应用于相应的每一个时间段内的所有数据。有时候为了把窗口计算的负载均匀的扩散到整个时间范围内,也会给窗口的边界加上一个随机值偏移量,这样固定窗口就成了不对齐的了。
Sliding windows are defined by a window size and slide period, e.g. hourly windows starting every minute. The period may be less than the size, which means the windows may overlap. Sliding windows are also typically aligned; even though the diagram is drawn to give a sense of sliding motion, all five windows would be applied to all three keys in the diagram, not just Window 3. Fixed windows are really a special case of sliding windows where size equals period.
滑动窗口由窗口大小和滑动周期来定义,例如, 小时窗口,每分钟滑动一次。 周期可能小于窗口尺寸,这意味着窗口可能会重叠。 滑动窗户通常也是对齐的; 尽管上面的图为了画出滑动的效果窗口没有遮盖到所有的键,但所有五个窗口都包含了图表中的所有的三个键,而不仅仅是窗口3.固定窗口实际上是滑动窗口的特殊情况,其中大小等于周期。
Sessions are windows that capture some period of activity over a subset of the data, in this case per key. Typically they are defined by a timeout gap. Any events that occur within a span of time less than the timeout are grouped together as a session. Sessions are unaligned windows. For example, Window 2 applies to Key 1 only, Window 3 to Key 2 only, and Windows 1 and 4 to Key 3 only.
会话是在某些数据子集上捕获一段时间活动的窗口,本文中是在每一个键上。一般来说会话按超时时间来定义,任何发生在超时时间以内的事件认为属于同一个会话。会话是不对齐的窗口。例如,窗口2只在键1内,窗口3只在键2内,窗口1和窗口4则只在键3内。译者注:假设key 是用户id, 那么两次活动之间间隔超过了超时时间,因此系统需要重新定义一个会话窗口。)
1.3 Time Domains
1.3 时间域
When processing data which relate to events in time, there are two inherent domains of time to consider. Though captured in various places across the literature (particularly time management [28] and semantic models [9], but also windowing [22], out-of-order processing [23], punctuations [30], heartbeats [21], watermarks [2], frames [31]), the detailed examples in section 2.3 will be easier to follow with the concepts clearly in mind. The two domains of interest are:
当处理包含事件发生时间的数据时,有两个时间域需要考虑。尽管很多的文献在不同的位置都提到了(特别是时间管理,语义模型,还有窗口,乱序处理,标记,心跳机制,水位标记,帧),这里说明一下的话,在2.3节中我们就更容易理解了。这两个时间域是:
• Event Time, which is the time at which the event itself actually occurred, i.e. a record of system clock time (for whatever system generated the event) at the time of occurrence.
事件时间:事件发生时候的时间,比如一个事件被系统记录的系统时间(不论这个事件何时产生,只看何时被记录)
• Processing Time, which is the time at which an event is observed at any given point during processing within the pipeline, i.e. the current time according to the system clock. Note that we make no assumptions about clock synchronization within a distributed system.
处理时间:是指在数据处理管道中处理该事件的时候,该事件被数据处理系统观察到的时间,是数据处理系统的当前时间。注意我们并未进行分布式系统的时间是同步的这种假设。
正文之后
两天时间就翻译了这么点,吃没吃下去还不好说,不过毕竟是第一次正儿八经的看英文论文,而且还是这种高难度的。。。比较虚。。不管了,先走着~明天继续发剩下的。。part