Percolator
摘要
从网络上爬取的文档是非常大量的数据,更新这些文档的索引需要持续的在大量文档上进行转换,因为新的文档会不停地到来。这是一类数据处理任务的一个典型例子,这一类任务都需要在一个既存的大量数据集上进行小的,单独的更新。这些任务在当前基础设施的能力之外。数据库的存储和吞吐量达不到这些任务的需求:Google的索引系统存储着几十PB的数据,每天在几千条服务器上处理数十亿次更新。MapReduce以及其他批处理系统不能单独地处理这种微小更新,它们的优势在于有效的进行大型批量处理。
我们构建了Percolator,一个用于在海量数据集上进行增量更新处理的系统,并使用这个系统来建立Google的web搜索索引。通过使用Percolator将基于批量处理的索引系统换为基于增量更新的系统,我们每天仍然可以处理相同数量的文档,同时将文档的平均寿命降低了50%。
1. 简介
考虑这样的任务:构建web文档的索引以响应查询。构建这个索引首先需要爬取网络上的所有页面,然后再对它们进行处理,并将一些不变量保留在索引中。比如,如果多个URL对应的页面内容都是相同的,那么只有PageRank^[]值最大的那个URL会保留在索引中。每个链接也都进行倒置处理,让外链中的描点文本和它所指向的页面对应起来。链接倒置必须要处理重复页面的情况:在一定的情况下,指向重复页面的链接只能指向PageRank值最大的那个页面。
这是一个批处理任务,可以表示为一系列的MapReduce操作:一个聚合重复页面,一个进行链接倒置处理,等等。保持不变量很容易,因为MapReduce可以限制计算的并发度;所有文档都按照步骤一个接一个进行处理。比如,当索引系统将描点文本添加到PageRank值最高的URL时,我们不需要担心PageRank值发生变化;之前的一个MapReduce步骤已经确定了它的PageRank值。
现在考虑在重新抓取了web上的部分内容后如何更新索引。仅仅在新抓取的这些页面上运行MapReduce任务是不够的,因为新抓取页面和其他剩余页面之间可能存在链接。必须在整个数据集上运行MapReduce,也就是,在新页面和旧页面的集合上运行。只要给足够的计算资源,加上MapReduce具有的扩展性,这种方案是可行的,事实上,Google的网页索引之前一直是以这种方式进行处理的。然而重新处理整个web的数据丢弃了之前的工作结果,也使得任务延迟正比于数据集的大小,而不是更新数据的大小。
索引系统可以将数据集存储在DBMS中,使用事务来进行单个的更新并保持不变量。然而,现有的DBMS无法处理如此庞大的数据量:Google的索引系统在几千台服务器上存储着几十PB的数据[]。类似Bigtable[]的分布式存储系统可以扩展到这个数据量,但是不能提供在并行更新下保持不变量的工具。
对于维护整个web索引这样的任务来说,一个理想的数据处理系统应该为增量处理而做特定优化;也就是说,它能够让我们维持一个巨大的文档数据集,同时又能够高效地将新爬取的文档更新到数据集中。考虑到这个系统需要并发处理很多微小更新,它还需要提供并发更新下的一致性,以及跟踪哪些更新已经处理完成的能力。
本文剩余部分描述了一个具体的增量处理系统:Percolator。Percolator给用户提供了随机访问一个PB大小数据集的能力。随机访问让我们能够处理单个的文档,避免了MapReduce所需要的数据集全局扫描。为了达到高吞吐,多台服务器上的许多线程需要同时处理数据集,因此Percolator提供了兼容ACID的事务能力,这样使得开发者更容易推理数据集的状态;当前我们实现了快照隔离^[]的语义。
除了推理数据状态之外,增量系统的开发者还需要跟踪增量操作的状态。为了达到这个目标,Percolator提供了观察者:这是系统中包含的一小段代码,当用户指定的某一列发生变化时,这段代码会被调用。Percolator应用程序被组织为一系列观察者;每个观察者完成一个任务,并向表中写入数据以触发下游的观察者。外部某个处理写入数据到表中,以触发链条中的第一个观察者。
Percolator是专为增量处理而设计的,它并不是用来替代现有的数据处理方案的。那种不能被拆分为多个更新的计算任务(比如排序)更适合由MapReduce来处理。此外,Percolator针对的计算任务需要有强一致性要求;否则Bigtable就足够了。最后,这个计算应该在某个维度上(数据量,CPU计算量)是非常大的;不适合由MapReduce和Bigtable小规模的计算可以用传统的DBMS来处理。
在Google内部,第一个使用Percolator的应用程序是web页面索引系统。通过将索引系统转换为增量系统,我们能够处理爬取的单个页面更新。这将平均文档处理延迟降低了100倍,搜索结果中的文档平均年龄也降低了50%(搜索结果的年龄包括了页面更新到页面被爬取之间的时间)。这个系统也被用于将页面渲染为图像;Percolator跟踪web页面和它所依赖的资源之间的关系,因此当所依赖的资源发生变化时,页面能够被重新处理。
2. 设计
为了在大规模数据集上进行增量处理,Percolator提供了两个主要的抽象:在一个随机访问的数据集上的ACID事务;以及一个组织增量计算的方式,观察者。
图1.jpgPercolator系统包括了三个二进制程序,这三个程序运行在集群中的每个服务器上:一个Percolator worker,一个Bigtable^[] tablet服务,一个GFS^[] chunkserver。所有观察者都被链接到Percolator worker中,worker会扫描Bigtable,寻找变化的列并唤醒响应的观察者,观察者在worker进程中以函数的形式被调用。观察者通过发送读或写的RPC请求给Bigtable tablet服务来进行事务处理,tablet服务则发送读写RPC给GFS chunkserver。系统还依赖两个小服务:时间戳服务器以及一个轻量的锁服务。时间戳服务器提供严格递增的时间戳:快照隔离协议的操作需要这个属性。workers通过使用轻量锁服务来高效的搜寻脏数据列。
从开发者的角度看,一个Percolator数据集包括了一些表。每个表中都是cell的集合,这些cell由行和列索引。每个cell包含一个值:一个无意义的字节数组。(在内部,为了达到快照隔离,我们将每个cell解释为一系列值,由时间戳索引)
在海量数据上运行,以及并不强调极低的延迟,这两点影响了Percolator的设计。对延迟的低要求让我们采用了一种懒惰的方式来清理事务处理失败而遗留的锁数据。这个懒惰但是容易实现的方案,潜在的将事务提交的延迟增加了数十秒。对于在DBMS上运行OLTP的任务来说,这个延迟是不可接受的,但是对于一个为全网页面建立索引的增量系统来说是可以忍受的。Percolator没有事务管理的中心节点;具体来说,它缺少一个全局死锁检测器。这增加了冲突事务的延迟但是允许系统扩展到几千台服务器。
2.1 Bigtable概览
Percolator构建在Bigtable分布式存储之上。Bigtable给用户展现了一个多维度的有序map:键是(row, column, timestamp)元组。Bigtbale提供以行为单位的查找和更新,在单行内,Bigtable保证原子的读写操作。Bigtable处理PB级别的数据,在数千台服务器上运行。
运行中的Bigtable包括了tablet服务的集合,其中每个服务负责几个tablet(连续的key空间)。由一个master服务来协调tablet服务,指导他们加载或者丢弃某些tablet。一个tablet被存储为只读文件的集合,这些文件组织为Google SSTable格式。SSTable则存储在GFS中;Bigtable依赖GFS来应对磁盘数据丢失的情况。Bigtable允许用户通过将一些列聚集在一起(column family)来提升性能。聚集在一起的列被存储在单独的SSTable文件中,这使得扫描这些数据更容易,因为其他列中的数据不需要扫描。
在Bigtable上构建Percolator的决定定义了Percolator的整体形态。Percolator维持了Bigtable接口的大概:数据被组织为Bigtable的行和列,Percolator元数据存储在指定的列中(图5所示)。Percolator的API和Bigtable的API非常类似:Percolator库包括了Bigtable的操作,包装为Percolator特定的计算操作。实现percolator的挑战在于要提供Bigtable没有的特性:跨行的事务以及观察者框架。
2.2 事务
Percolator提供跨行,跨表的事务,并保证ACID快照隔离语义。Percolator用户使用命令式语言(当前是C++)编写事务代码,并在代码中调用Percolator API。图2展示了一个简化的例子,通过对内容进行哈希将文档聚集分类起来。在这个示例中,如果Commit()返回错误,说明事务有冲突(两个相同内容的URL被同时处理了)并且应该等一会再重试。Get()和Commit()都是阻塞调用;通过在线程池中执行很多线程来实现并发效果。
图2.jpg尽管可以在没有强事务保证的情况下处理增量数据,使用事务可以让用户更加容易推测系统的状态,并避免给这个需要长期维护的数据集引入错误。比如,在支持事务的索引系统中,文档内容的哈希总是一致的,这个文档URL会记录在以哈希值为索引的表中,在同一个哈希值索引下还有重复的文档URL。如果没有事务,文档表中的某个条目可能找不到对应的哈希索引(???)。事务也使得构建索引表很容易,同时也能保持一致性和实时性。注意到,上面这两个例子都需要跨行的事务能力,而不是Bigtable已经提供的单行内事务能力。
Percolator使用Bigtable的时间戳维度来存储多个版本的数据。快照隔离[]需要多版本支持,它给每个事务展现出某个时间戳下稳定的快照,用来读取数据。写入数据则在一个稍晚的时间戳下进行。快照隔离防止了写-写冲突:如果事务A和B同时运行,向同一个cell中写入数据,最多只有一个事务能提交成功。快照隔离不保证串行化;具体来说,快照隔离下的事务可能会出现写偏斜[]。相比于串行化,快照隔离最主要的优势在于读取的高效。因为每个时间戳代表着一个一致的快照,读取一个cell的数据只需要在指定时间戳下查询一次Bigtable;而不需要获取锁。图3说明了快照隔离下事务之间的关系。
图3.jpg由于Percolator被构建为一个客户端程序库来访问Bigtable,而不是直接控制对存储的访问,Percolator面临着一些传统PDBMS不会遇到的分布式事务上的难题。其他并行数据库将管理访问磁盘的锁功能集成到系统组件中了:由于每个节点以及了解了对磁盘数据的访问规则,它可以将锁赋给某些请求,同时阻止那些违背了锁要求的请求。(???)
作为对比,Percolator中的每个节点都可以直接发起对Bigtable状态的修改:没有什么好办法来拦截请求并加锁。因此,Percolator必须显示的维护锁。锁必须要持久化,以防止机器故障;如果锁在两阶段提交之间消失了,系统可能错误的将两个冲突的事务都提交了。锁服务必须提供高吞吐;上千台服务器可能同时需要锁。锁服务需要是低延迟的;每个Get()操作都需要在读取数据之外读取锁,我们想要优化这个延迟。考虑到这些问题,锁服务需要有副本(应对故障),是分布式而且负载均衡的(应对高流量),并且写入到持久化存储中。Bigtable本身满足这些要求,因此Percolator将锁存储在一个指定的列中,和数据使用同一个Bigtable,并且当访问数据时,在一个单行事务中读取和修改锁。
图4.jpg 图5.jpg 图6.jpg我们现在来更详细地讨论事务协议。图6展示了Percolator事务的伪代码,图4展示了在一个事务中Percolator的数据以及元数据的结构。图5中描述了系统使用的这些元数据的列。事务的构造函数从时间服务器获取一个时间戳(line 6),这个时间戳决定了Get()所看到的快照。Set()调用被缓存起来(line7)直到提交。提交缓存的写操作的基本思路是两阶段提交,由客户端来协调。不同服务器上的事务通过Bigtable tablet服务器上的单行事务来交互。
在提交的第一阶段("prewrite"),我们尝试锁住所有需要写入的cell。(为了应对客户端复制,我们指定任意一个锁为primary;后面会讨论这个机制)。事务会读取元数据以检查每个要写入的cell是否冲突。有两种情况是冲突的:如果事务看到在它的开始时间戳之后有写入记录,则终止自己(line32);这表示发生了写-写冲突,是快照隔离需要避免的。如果事务发现其他任意时间戳的锁,也立即终止(line34)。有可能其他事务在我们开始时间戳之前已经完成了提交,并且正在缓慢的释放它的锁,但是我们认为这不太可能发生,因为我们选择终止事务。如果没有冲突,我们用开始时间戳给每个cell写入锁以及数据(line36-38)。
如果没有cell冲突,事务可以提交并且继续第二阶段。在第二阶段开始时,客户端从时间戳服务器获取提交时间戳(line 48)。然后,对每个cell(从primary开始),客户端释放它的锁,并写入一个write记录使得它对读取可见。write记录向读取者表示这个cell中有以提交的数据;它包含了一个开始时间戳,读取者可以通过它获取到实际数据。一旦primary的写入成功了,事务就必须提交了,因为它已经使得一个写入对读取者可见了。
Get()操作首先检查[0, start_timestamp]范围内的锁数据,这个范围内的快照可以被事务看到(line12)。如果存在锁,意味着其他事务正在写这个cell,因此读事务必须等待这个锁被释放。如果没有发现冲突的锁,Get()读取这个时间戳范围内最新的write记录(line19)并返回这个write记录所对应的数据(line22)。
由于客户端可能故障,事务处理变得复杂(tablet服务故障不影响系统,因为Bigtable保证已写入的锁会持久化)。如果一个事务正在提及的过程中客户端故障了,会遗留锁在系统中。Percolator必须要清理这些锁,否则它们可能会导致后续的事务无限hang住。Percolator采用了一个懒惰方案来清理锁:当事务A遇到一个由事务B遗留的锁时,A可能会认为B已经失败了并清理这些锁。
很难保证A认为B已经失败了是完全正确的;因此我们必须要避免这样的情形:A在清理B的事务中的锁,然而B并不是真的失败了而是在尝试提交事务。Percolator处理这个问题的方法是,在每个事务中指定一个cell作为同步点,来判断应该提交还是清理。这个cell的锁就是primary锁。A和B都知道哪个lock是primary(primary的位置写在了事务中所有cell的lock中)。不管是提交还是清理操作,都需要更改primary锁;由于这个更改是在Bigtable的单行事务中进行,只有一个操作会成功。尤其是,B在提交之前,必须要检查它是否仍然持有primary锁,并将其删除然后写入write记录。在A清理B的锁之前,它必须检查primary锁,保证B没有提交;如果primary锁仍然存在,就可以安全的删除这个锁。
当客户端在提交的第二阶段故障时,事务已经过了提交点(至少写入了一个write记录)但遗留了锁。我们必须继续执行这个事务。当事务遇到锁时,可以通过primary锁来判断两种情形:如果primary锁已经被删除并写入了write记录,写入这个锁的事务必须要继续执行,否则它应该回滚(由于我们总是先提交primary,当primary没有提交时,我们可以保证回顾整个事务是安全的)。继续执行事务的方式是清理锁并写入write记录,就像原始事务执行的那样。
由于清理primary锁是同步的,清理活动客户端的锁也是安全的;然而,这会导致性能的下降,因为它会强制事务回滚(???)。因此,一个事务不会清理锁,除非它认为这个锁属于一个退出的或者卡住的worker。Percolator使用了一个简单的机制来判断worker是否存活。正在运行的worker会在Chubby^[]中写入一个token,以此声明自己还在系统中;其他worker可以通过这个token来判断此worker仍然存活(当进程退出时这个token会自动删除)。为了处理worker存活但是长时间不工作的情况,我们额外的写一个墙上时间到锁中;如果锁中的墙上时间过于老旧,那么它可以被清理掉,即使worker还是存活的。为了应对需要长时间提交的事务,worker在提交事务的过程中会周期性的更新这个墙上时间。
2.3 时间戳
时间戳服务器(oracle)可以提供连续递增的时间戳。由于每个事务都要获取两个时间戳,这个时间戳服务必须能够有很好的扩展性。Oracle周期的分配一个范围的时间戳,并将这个范围内最大的时间戳写入到持久存储;这样一来,oracle就可以直接从内存中获取时间戳来应付请求。如果oracle重启了,它会直接从之前存储的最大时间戳开始分配(绝不会分配比这个值小的时间戳)。为了应对大量RPC的耗时(会增加事务延迟),每个Percolator进行都会批量的从oracle中获取时间戳,一次性获取多个事务需要的时间戳。在oracle的负载变得沉重时,批量操作能有效缓解压力。批量操作增加了oracle的扩展性但是并没有影响oracle对时间戳的属性保证(仍然是递增)。我们的oracle服务在单台服务器上可以提供每秒2百万个时间戳。
事务使用严格递增的时间戳来保证Get()能够返回所有正在此事务开始时间戳之前提交的写。为了观察它是如何保证这一点的,考虑一个时间戳TR开始的事务R,以及一个在TW < TR提交的事务W;我们会看到R能够看到W的写入。由于TW < TR,我们可以知道oracle在给出TR直接给出了TW。R在没有收到它的开始时间戳TR之前是不能读数据的,同时W在获取时间戳TW之前就写入了锁。因此以上可以说明,W一定在R读数据之前就把所有的锁写入了;R的Get()要么会看到已经提交的write记录,要么看到锁,在后一种情况下,R必须要阻塞等待锁被释放。不管哪种情形,W的写都对R是可见的。
2.4 通知
事务可以让用户更改表并保持不变量,但是用户也需要一个机制来触发事务。在Percolator中,用户编写一些可以被表更改所触发的代码(observer),我们把observer链接到一个二进制程序中,并在系统中的每个tablet服务器中运行。每个observer在Percolator中注册了一个函数和一些列,当有数据写入到这些列中任一列时,Percolator就调用这个函数。
Percolator应用程序的结构是一系列observer;每个observer完成一个任务并写入数据到表中,以此为下游的observer创建更多工作。在外面的索引系统中,一个MapReduce任务运行加载事务以把爬取的文档加载到Percolator中,这会触发文档处理事务来给这些文档添加索引(页面分析,抽取链接等)。文档处理事务还会触发更多其他事务比如聚合事务。聚合事务又会触发另外的事务将更改后的文档集合导出到服务系统。
通知机制和数据库的触发器和事件很相似,但是不同的是,他们不能用来维持数据库不变量。具体来说,触发器的执行和触发写的操作在不同的事务中,因此触发写操作和触发器执行不是原子的。通知机制的意图是用来帮助处理增量计算而不是帮助维持数据完整性。
在语义上的不同使得observer的行为相比于那些覆盖的触发器更容易理解。Percolator应用程序只包括了很少的observer--Google索引系统大概有10个observer。每个observer都显示的编写在工作进程的main()函数中,因此很容易确认哪些observer是活动的。多个observer观察同一列是可以实现的,但是我们避免使用这个特性,为了更清楚的看到当某一列写入时会触发哪个observer。用户需要注意无限循环的observer,Percolator没有阻止这种情况的出现;用户一般会构建一连串observer来避免循环路径。
我们提供一个保证:当一列数据发生了一次变更后,最多只有一个observer事务会提交。但是反过来是不成立的,当某一列发生了多次写入时,可能只会触发相应的observer执行一次。我们把这个特性叫做消息崩溃,因为它帮助避免了很多通知同时触发的计算。比如,对于http://google.com
,只需要周期性的处理就可以了,而不用每次发现一个新的指向它的链接就重新处理一次。
为了提供通知的这些语义,每个被观察的列对每个observer都有一个额外的"acknowledgmente"列,其中包含了这个observer最近一次运行的时间戳。当被观察列中有数据写入时,Percolator启动一个事务来处理通知。这个事务会读取被观察列以及对应的"acknowledgment"列。如果被观察列是在最近的运行时间后写入的,我们运行observer并把我们的开始时间戳写入到"acknowledgment"列中。否则,这个observer已经在运行了,我们不用再次运行。注意到,如果Percolator不小心对同一个通知开启了两个事务,他们都会看到脏的通知并启动observer,但是最终有一个会终止,因为它们会在"acknowledgment"列数据上冲突。我们保证对每个通知都只会有一个observer提交。
为了实现通知,Percolator需要高效的找到脏cell以及需要运行的observer。这个搜索很困难,因为通知很少:我们的表有亿万个cell,但是在普通的压力负载下,只有几百万个通知。此外,observer代码运行在很多服务器上的很多客户进程上,意味着搜索脏cell必须是分布式的。
为了识别脏cell,Percolator维护了一个特别的"notify"列,每个脏cell都在其中有一个条目。当一个事务写入了被观察的cell时,它同时也会设置相应的notify cell。搜索进程则在notify列上进行分布式缩缩。在observer被触发以及事务提交之后,我们删除notify cell。由于notify列是Bigtable列,而不是Percolator列,它没有事务属性,只能作为一个提示,让搜索进程去检查acknowledgement列以觉得是否需要运行observer。
为了让搜索更加高效,Percolator把notify列放在一个单独的Bigtable本地组中(???),这样扫描这一列值需要读几百万脏cell了,而不是亿万个cell。每个Percolator启动几个线程来做扫描。每个线程都会从表中选择一部分来扫描,它会随机挑选一个tablet,然后随机挑选一个key,从这个key开始扫描表。由于每个worker都在随机的扫描表的一部分,我们担心两个worker可能会在同一行上运行observer。虽然由于通知的事务特性,这样做不会导致正确性问题,但是它是很低效的。为了避免这中情况,每个worker在扫描一行之前都从轻量锁服务中获取一个锁。这个锁服务不需要持久化状态,因为它只是建议性的。(???)
未完