
【Data Flow】The Dataflow Model: A

    3.实现 & 设计

3.1 Implementation
3.1 实现

We have implemented this model internally in FlumeJava, with MillWheel used as the underlying execution engine for streaming mode; additionally, an external reimplementation for Cloud Dataflow is largely complete at the time of writing. Due to prior characterization of those internal systems in the literature, as well as Cloud Dataflow being publicly available, details of the implementations themselves are elided here for the sake of brevity. One interesting note is that the core windowing and triggering code is quite general, and a significant portion of it is shared across batch and streaming implementations; that system itself is worthy of a more detailed analysis in future work.

我们已经在FlumeJava内部,基于MillWheel作为流式模式下的底层执行引擎,实现了这个模型。另外,在写作本文的时候,一个外部的,针对Cloud Dataflow的重新实现也已经大体完工。由于这些在文中前面出现的内部系统的实现在Cloud Dataflow都是公开可获取的,因此为了简洁起见,我们省略了这些实现。一个值得注意的点是,核心的窗口化以及触发机制的代码是通用的,这些重要的地方都适用于批处理和流式模式下的实现。系统本身在未来的工作中需要更加细化的分析。

3.2 Design Principles
3.2 设计原则

Though much of our design was motivated by the real-world experiences detailed in Section 3.3 below, it was also guided by a core set of principles that we believed our model should embody:
• Never rely on any notion of completeness.
• Be flexible, to accommodate the diversity of known use cases, and those to come in the future.
• Not only make sense, but also add value, in the context of each of the envisioned execution engines.
• Encourage clarity of implementation.
• Support robust analysis of data in the context in which they occurred.
While the experiences below informed specific features of the model, these principles informed the overall shape and character of it, and we believe ultimately led to a more comprehensive and general result.


  1. 永远不要奢望完美
  2. 灵活点,要能够满足当前的多样化使用案例的需求并且考虑到将来的。
  3. 对于每个期望的执行引擎,不仅仅要有意义,而且要附加另外的价值。
  4. 鼓励清晰地实现。(透明性)
  5. 支持对数据在它们产生的上下文中进行健壮性分析。


3.3 Motivating Experiences
3.3 业务场景

As we designed the Dataflow Model, we took into consideration our real-world experiences with FlumeJava and MillWheel over the years. Things which worked well, we made sure to capture in the model; things which worked less well motivated changes in approach. Here are brief summaries of some of these experiences that influenced our design.


3.3.1 Large Scale Backfills & The Lambda Architecture: Unified Model
3.3.1 大规模数据回写 & Lambda架构:统一模型
A number of teams run log joining pipelines on MillWheel. One particularly large log join pipeline runs in streaming mode on MillWheel by default, but has a separate FlumeJava batch implementation used for large scale backfills. A much nicer setup would be to have a single implementation written in a unified model that could run in both streaming and batch mode without modification. This became the initial motivating use case for unification across batch, micro-batch, and streaming engines, and was highlighted in Figures 10−12.

Another motivation for the unified model came from an experience with the Lambda Architecture. Though most data processing use cases at Google are handled exclusively by a batch or streaming system, one MillWheel customer ran their streaming pipeline in weak consistency mode, with a nightly MapReduce to generate truth. They found that customers stopped trusting the weakly consistent results over time, and as a result reimplemented their system around strong consistency so they could provide reliable, low latency results. This experience further motivated the desire to support fluid choice amongst execution engines.


3.3.2 Unaligned Windows: Sessions
3.3.2 非对齐窗口:会话

From the outset, we knew we needed to support sessions; this in fact is the main contribution of our windowing model over existing models. Sessions are an extremely important use case within Google (and were in fact one of the reasons MillWheel was created), and are used across a number of product areas, including search, ads, analytics, social, and YouTube. Pretty much anyone that cares about correlating bursts of otherwise disjoint user activity over a period of time does so by calculating sessions. Thus, support for sessions became paramount in our design. As shown in Figure 14, generating sessions in the Dataflow Model is trivial.


3.3.3 Billing: Triggers, Accumulation, & Retraction
3.3.3 Billing:触发器,累积 ,& 撤回

Two teams with billing pipelines built on MillWheel experienced issues that motivated parts of the model. Recommended practice at the time was to use the watermark as a completion metric, with ad hoc logic to deal with late data or changes in source data. Lacking a principled system for updates and retractions, a team that processed resource utilization statistics ended up leaving our platform to build a custom solution (the model for which ended being quite similar to the one we developed concurrently). Another billing team had significant issues with watermark lags caused by stragglers in their input. These shortcomings became major motivators in our design, and influenced the shift of focus from one of targeting completeness to one of adaptability over time. The results were twofold: triggers, which allow the concise and flexible specification of when results are materialized, as evidenced by the variety of output patterns possible over the same data set in Figures 7−14; and incremental processing support via accumulation (Figures 7 and 8) and retractions (Figure 14).


3.3.4 Statistics Calculation: Watermark Triggers
3.3.4 统计计算:水位标记触发器

Many MillWheel pipelines calculate aggregate statistics (e.g. latency averages). For them, 100% accuracy is not required, but having a largely complete view of their data in a reasonable amount of time is. Given the high level of accuracy we achieve with watermarks for structured input sources like log files, such customers find watermarks very effective in triggering a single, highly-accurate aggregate per window. Watermark triggers are highlighted in Figure 12.
A number of abuse detection pipelines run on MillWheel. Abuse detection is another example of a use case where processing a majority of the data quickly is much more useful than processing 100% of the data more slowly. As such, they are heavy users of MillWheel’s percentile watermarks, and were a strong motivating case for being able to support percentile watermark triggers in the model.


Relatedly, a pain point with batch processing jobs is stragglers that create a long tail in execution time. While dynamic rebalancing can help with this issue, FlumeJava has a custom feature that allows for early termination of a job based on overall progress. One of the benefits of the unified model for batch mode is that this sort of early termination criteria is now naturally expressible using the standard triggers mechanism, rather than requiring a custom feature.


3.3.5 Recommendations: Processing Time Triggers
3.3.5 建议:处理时间触发器

Another pipeline that we considered built trees of user activity (essentially session trees) across a large Google property. These trees were then used to build recommendations tailored to users’ interests. The pipeline was noteworthy in that it used processing-time timers to drive its output. This was due to the fact that, for their system, having regularly updated, partial views on the data was much more valuable than waiting until mostly complete views were ready once the watermark passed the end of the session. It also meant that lags in watermark progress due to a small amount of slow data would not affect timeliness of output for the rest of the data. This pipeline thus motivated inclusion of processing-time triggers shown in Figures 7 and 8.


3.3.6 Anomaly Detection: Data-Driven & Composite Triggers
3.3.6 异常探测:数据驱动 & 组合触发器

In the MillWheel paper, we described an anomaly detection pipeline used to track trends in Google web search queries. When developing triggers, their diff detection system motivated data-driven triggers. These differs observe the stream of queries and calculate statistical estimates of whether a spike exists or not. When they believe a spike is happening, they emit a start record, and when they believe it has ceased, they emit a stop. Though you could drive the differ output with something periodic like Trill’s punctuations, for anomaly detection you ideally want output as soon as you are confident you have discovered an anomaly; the use of punctuations essentially transforms the streaming system into micro-batch, introducing additional latency. While practical for a number of use cases, it ultimately is not an ideal fit for this one, thus motivating support for custom data-driven triggers. It was also a motivating case for trigger composition, because in reality, the system runs multiple differs at once, multiplexing the output of them according to a well-defined set of logic. The AtCount trigger used in Figure 9 exemplified data-driven triggers; figures 10−14 utilized composite triggers.


  2. 结论
    The future of data processing is unbounded data. Though bounded data will always have an important and useful place, it is semantically subsumed by its unbounded counterpart. Furthermore, the proliferation of unbounded data sets across modern business is staggering. At the same time, consumers of processed data grow savvier by the day, demanding powerful constructs like event-time ordering and unaligned windows. The models and systems that exist today serve as an excellent foundation on which to build the data processing tools of tomorrow, but we firmly believe that a shift in overall mindset is necessary to enable those tools to comprehensively address the needs of consumers of unbounded data.
    Based on our many years of experience with real-world, massive-scale, unbounded data processing within Google, we believe the model presented here is a good step in that direction. It supports the unaligned, event-time-ordered windows modern data consumers require. It provides flexible triggering and integrated accumulation and retraction, refocusing the approach from one of finding completeness in data to one of adapting to the ever present changes manifest in realworld datasets. It abstracts away the distinction of batch vs. micro-batch vs. streaming, allowing pipeline builders a more fluid choice between them, while shielding them from the system-specific constructs that inevitably creep into models targeted at a single underlying system. Its overall flexibility allows pipeline builders to appropriately balance the dimensions of correctness, latency, and cost to fit their use case, which is critical given the diversity of needs in existence. And lastly, it clarifies pipeline implementations by separating the notions of what results are being computed, where in event time they are being computed, when in processing time they are materialized, and how earlier results relate to later refinements. We hope others will find this model useful as we all continue to push forward the state of the art in this fascinating, remarkably complex field.

We thank all of our faithful reviewers for their dedication, time, and thoughtful comments: Atul Adya, Ben Birt, Ben Chambers, Cosmin Arad, Matt Austern, Lukasz Cwik, Grzegorz Czajkowski, Walt Drummond, Jeff Gardner, Anthony Mancuso, Colin Meek, Daniel Myers, Sunil Pedapudi, Amy Unruh, and William Vambenepe. We also wish to recognize the impressive and tireless efforts of everyone on the Google Cloud Dataflow, FlumeJava, MillWheel, and related teams that have helped bring this work to life.

我们感谢这篇文章的所有评审者:他们专注付出,提供了很有思考的意见。他们是:Atul Adya, Ben Birt,Ben Chambers, Cosmin Arad, Matt Austern, Lukasz Cwik,Grzegorz Czajkowski, Walt Drummond, Je_ Gardner, An-thony Mancuso, Colin Meek, Daniel Myers, Sunil Pedapudi,Amy Unruh, and William Vambenepe。我们也想在此赞扬谷歌Cloud Dataflow团队,FlumeJava团队,MillWheel团队和其他相关团队的成员,他们为这项工作付出了令人影响深刻的不倦的努力。


