@IT·互联网

Realtime Data Processing at Face

2021-11-04  本文已影响0人  零度沸腾_yjz

论文概要

"Realtime Data Processing at Facebook"是Facebook 在2016年发表的论文。论文中介绍了Facebook是如何构建分布式实时数据处理系统的。论文的亮点在于:着重介绍了Facebook在设计实时数据处理系统过程中,针对系统部分关键点的做了哪些设计、以及为什么这样设计,同时业界是怎么做的,这些设计点也是当前实时计算领域关键设计点。

Facebook实时处理场景

Facebook实时系统现状

Facebook的消息总线就是一套分布式消息队列,基本和kafka类似。

系统架构

Facebook内部的实时数据处理系统实际是由多个系统组成

  1. 左侧的mobile或web数据被送入到Scribe消息总线

  2. 实时数据处理系统Puma、Stylus和Swift从Scribe中读取数据进行处理,然后将处理之后的数据在写入到Scribe中。Puma、Stylus和Swift可以分别单独使用,可以通过Scribe来来构成一个复杂的DAG。

  3. 右侧Laser、Suba和Hive是用于提供不同类型查询的存储系统,他们的数据也是从Scribe中摄取。Laser还可以将数据提供给流处理系统和线上产品。

image.png

分布式数据通信Scribe

Scribe是一个秒级延迟、高吞吐、可持久化的分布式消息系统。Scribe中数据组织方式和Kafka类似,数据通过category(对应kafka中的topic)组织,category内部bucket(对应kafka中的partition)是数据处理单位。Scribe将数据持久化到HDFS中,具备回读、重复读能力。

Scribe在Facebook也称为分布式数据传输系统,意思就是专门用于系统内或系统间的数据传输解决方案。

流处理引擎Puma

Puma是一个流处理系统,应用程序使用类SQL语言编写,UDF使用Java编写。它的优点就是能在很段的时间(1h内)完成application的编写、测试和部署。

使用Puma编写的应用主要有两个应用目的:

  1. Stateful计算与服务:为简单的聚合查询提供预计算,查询延迟就等于聚合窗口的大小,查询结果是通过Thrift API来查询Puma,Puma的数据结果是存储在HBase上的。(有点类似Queryable state,但是Queryable state不绑定窗口,可查询窗口内数据、并且数据可以存储在本地)
计算top K 事件,5min时间窗口

计算top K 事件,5min时间窗口

  1. Stateless计算:Puma提供了Scribe流的过滤和处理能力。

Puma不具备ad-hoc查询能力,因为在编译阶段对查询进行了优化,所以Puma应用都是部署几个月甚至几年的应用。

流处理引擎Swift

Swift是一个具备Checkpoint功能的基础流处理引擎,它的使用非常简单灵活,可以指定从Scribe中读取固定字符串或者字节来作为一个Checkpoint。如果Swift application 挂掉后,可以从最后一次Checkpoint读取数据。所以能够保证数据至少被处理一次(at least once)。

Swift一般使用Python脚本语言来编写流应用处理程序。

流处理引擎Stylus

Stylus是一个使用C++编写的low level的流处理框架,Processor是Stylus的基础组件。Processor可以是Stateless或Stateful,Processor可以组成一个复杂的DAG。

Processor API基本和我们现在各个流处理引擎一致,Stylus也支持event time和wartmark。

高性能存储服务Laser

Laser是一个在RocksDB之上构建的,高查询吞吐量、低延迟的ky存储服务。Laser的数据来源于Scribe或者Hive(每天读取一次)。Laser能够被Facebook线上产品以及Puma和Stylus所访问。

Laser主要有两个用途:

  1. 能够将Puma和Stylus处理结果(Scribe)应用到Facebook的线上产品。
  2. 将Hive中复杂查询结果和Scribe数据存储起来,供Puma或Stylus使用。(有点缓存中间结果的意思)

OLAP Store Scuba

Scuba是一个快速切片的分析数据存储,Scuba具有每秒摄入数百万行数据并插入到数千个表中的能力。Scuba中的数据是由线上产品输出到Scribe,然后在摄入到Scuba中,这个过程大概有1min延迟。Scuba也支持Puma、Stylus和Swify中的输出数据。

Scuba具备ad-hoc的查询能力,查询延迟一般在1s以内。同时Scuba通过UI可以展示查询结果(支持各种图标)。

数据仓库Hive

Hive在Facebook中是用于存储EB级别的数据仓库,每天会接收几PB的数据写入。Hive中的数据,Facebook使用Presto查询,Presto提供了完成ANSI SQL查询语义,查询结果可以存储到Laser中,被线上产品或者其它流处理引擎所使用。

设计决策

Paper中提到了Facebook在做流处理系统过程中,在一些关键点上做了一些设计决策,这些设计决策对比了业界已有的方案,并且给出了这些决策对Streaming系统的影响。

Paper中首先说明了流处理系统的存在五方面的重要设计:易用性(Easy of use)、性能(Performance)、容错(Fault-tolerance)、扩展性(Scalability)和正确性(Correctness)。

易用性:处理是否复杂?SQL是否够用?是否还需要通用语言(比如java或C++)?用于编写、测试和部署的速度有多快。

性能:多少延迟是ok的,毫秒级、秒级还是分钟级?需要多高的吞吐?

容错:可以容忍什么类型的失败?数据处理或者输出的次数保证了什么语义?系统如何存储和恢复内存中的状态数据。

扩展性:数据能不能分片和重分片来并行处理?系统能否根据流量扩缩容?

正确性:是否需要ACID?所有输入数据是否都需要输出?

这易用性、性能、容错、扩展性和正确性基本是现在流系统的对比标准。

Paper分别从语言范式、数据传输、处理语义、状态存储机制和再处理五方面来说明Facebook在流系统设计之初的设计决策。

image.png

Paper对业界已有流处理引擎和Facebook流处理引擎在上面设计决策进行了对比。

image.png

语言范式设计决策

语言范式是指用户编写流处理应用时所使用的语言。对于语言范式的选择决策,决定了应用应用程序编写的难易以及编写者对引擎性能的控制粒度。

在流处理场景,可选择的通用语言范式有三种:

  1. 声明式语言(declarative),主要以SQL为代表。它的优势在于简单可快速上手,但是表达能力有限,许多系统需要额外增加一些UDF。

  2. 函数式(functional),函数式编程模型将应用程序表示为一系列预定义的operator。它方便编写,并且有更多操作(operator)可用,而且能够控制这些operator的顺序。

  3. 程序语言(procedural),像c++、java、python都是通用的程序语言,它们具备非常高的灵活性和性能。但是它们往往需要更长时间的编写和测试。比如Storm、Heron、Samza等。

在Facebook内部,由因为没有单一的语言范式能够满足所有场景,所以需要多种语言范式的流处理引擎(针对不同易用性和性能),这也是为什么他们内部有三套流处理引擎。Puma使用Sql、Swift使用Python、Stylus使用C++来编写应用程序。对于函数式编程,FB内部还没有支持,他们在调研Spark Streaming这些流处理引擎。

目前主流流计算引擎已经能一套runtime支持多种level的api,比如Flink的SQL/Table API 、DataStream/DataSet API和ProcessFunction。

数据传输设计决策

流处理应用通常是由多个节点组成的一个复杂的DAG,所以需要在节点间进行数据传输。数据传输对于系统的容错、性能和扩展性都是非常重要功能。由于应用程序调试,所以对易用性也有影响。

通用的传输机制主要有以下三类:

  1. 直接消息传输,通常使用RPC或者内存消息队列来实现进程间数据传输,比如MillWheel、Spark、Flink等都采用RPC进行通信,而Storm则使用ZeroMQ进行通信。直接消息传输的优点在于性能非常好,基本能保证端到端在10ms以内。

  2. 基于代理的消息传输,通过单独的代理节点来链接流处理节点,并进行消息的转发。该模式会增加系统开销,但系统会有很好的扩展性。代理模式的消息传输,还可以将输入流复用到多个输出的Processor中,同时也具备背压的能力。比如Heron所使用的stream manager就是该模式。

  3. 基于持久存储的消息传输,streaming的processor链接持久化的消息总线(消息队列),读写都直接操作该消息队列。该模式是可靠性最强的传输方式,不仅具备多路复用的能力,数据写入和读取还能以不同速率处理。该模式也具备完整的单点故障恢复能力。比如Samza就是使用kafka来做数据传输。

我们上面说了Facebook使用Scribe来做数据传输,该模式大概会有1s左右的数据延迟(并且会落盘,所以也受限于磁盘和网络io)。之所以Facebook会选用这种模式,主要考虑在当时Facebook内部大部分场景都能够容忍这个延迟,并且该模式为容错、易用性、扩展性和性能带来很大便利。

处理语义

处理语义的选择会影响流系统的容错和正确性能。

paper首先将流处理系统所做的事情总结为三部分。

  1. 处理输入数据,比如反序列化、查询外部系统、更新内存状态等。

  2. 生成输出,基于输入数据和内存状态,为下游系统生成输出数据。

  3. 保存Checkpoint到数据库,用于故障恢复。需要保存的内容有三部分:

  4. 内存中的状态。

  5. 输入流的offset。

  6. 输出数据。

上面这三部分可以总结为两种类型的处理语义:

  1. 状态语义,每个输入事件至少被处理一次(at-least-once)、最多处理一次(at-most-once)和只处理一次(exactly-once)。
  2. 输出语义,给定的输出值至少出现一次、最多出现一次和只出现一次。

状态语义就是我们平时所讨论的流处理引擎的数据处理语义;输出语义就是我们所说的端到端的数据一致性语义。

对于Stateless处理节点只有输出语义,对于Stateful处理节点两种处理语义都存在。

状态语义

状态语义只取决于存储offset和存储内存状态的顺序。

下图是当发生fo后,不同状态语义所带来的结果。

image.png

输出语义

输出语义,除了依赖内存中状态和offset,还取决输出值的保存。

image.png

状态语义与输出语义的对应关系:

image.png

在Facebook内部,虽然对于各种语义需求都有,但是从论文看是非常重视at-least-once处理的。因为at-least-once一方面能提供极致的性能体验,另一面exactly-once需要借助事务存储系统。

状态存储机制

状态存储的目的是当发生fo后,来恢复状态。对于状态的存储和恢复,有多种方式。

  1. 状态节点多副本,通过启动多个状态节点,来达到状态节点多副本的能力。该方式会带来额外的硬件开销。

  2. 本地状态存储,类似Samza将状态存储到本地db,并同时将数据写到kafka。

  3. 远端数据库持久化,将Checkpoint数据存储到远端数据库中,比如HBase。

  4. 上游备份,事件缓存在上游节点,当当前节点fo后,上游重放。

  5. 全局一致性快照,Flink所使用的分布式快照算法,当一个节点fo后,需要将多个节点恢复到一致(联动fo)。

在Facebook 内部对流处理系统的容错有着不同的需求,Puma提供了有状态聚合的容错,Stylus提供了local database模式和remote database模式。

image.png

local database 首先定时将内存数据写到local database,后端进程在使用一个更大的时间间隔来将local database数据异步同步到HDFS。当处理进程fo时优先使用本地数据恢复,如果恢复后的进程不在之前的机器,则从HDFS并行拉取数据。

image.png

remote database( ZippyDB) 不会将状态存储在本地,使用remote database一个主要优势就是快速FO,因为不需要等从远端下载完整的状态数据。

回填处理(Backfill processing)

回填处理也就是需要重新处理旧数据。

Facebook内部使用MapReduce读取Hive数据方式来进行旧数据处理。

一些经验教训

多系统能够快速实现和迭代(分别针对易用性、容错、扩展等)、流系统的易用性非常重要(编写、调试、部署、监控)。

总结

上一篇下一篇

猜你喜欢

热点阅读