hive on spark总体设计

2017-06-14  本文已影响0人  snail_knight

http://www.csdn.net/article/2015-04-24/2824545

HIve on spark 总体设计思路,尽可能重用Hive逻辑层面的功能;从省城物理计划开始,提供一整套针对spark的实现,比如SparkCompiler,SparkTask等,这样HIve的查询就可以作为Spark的任务来执行了。一下是几点主要的设计原则。

1、尽可能减少对Hive原有代码的修改,这是和之前的Shark设计思路最大的不同,Shark对HIve的修改动太大以至于无法被Hive社区接受,HIve on spark尽可能减少改动HIve的代码,从而不影响Hive目前对MapReduce和Tez的支持,同时,HIve on spark 保证对现有的MapReduce和Tez模式在功能性能方面不会有任何影响。

2、对于选择Spark的用户,应使其能够自动的获取hive现有和未来新增的功能。

3、尽可能降低维护成本,保证对spark依赖的松耦合。

新的计算引擎

Hive的用户可以通过hive.execution.engine来设置计算引擎,目前该参数可选的值为mr何tez,为了实现Hive on spark,我们将spark作为参数第三个选项。要开启Hive on Spark模式,用户仅需将这个参数设置为Spark即可。

以Hive的表作为RDD

Spark以分布式可靠数据集合作为其数据抽象,因此我们需要将Hive的表转换为RDD以便spark处理。本质上,hive表和spark的HadoopRDD都是HDFS上的一组文件,通过inputFormat和RecordReader读取其中的数据,因此这个转化为自然而然的。

使用Hive原语

这里主要是指Hive的操作符对数据进行处理。Spark为RDD提供了一些列的转换Transformation,其中有些转换也是面向SQL的,比如groupByKey、join等如果使用这些转换(就如Shark所做的那样),那就意味着我们要重新实现一些Hive已有的功能;而且当Hive增加新的功能时,我们需要响应的Hive on spark模式。有鉴于此,我们选择将Hive的操作包装为Function,然后应用到RDD上,这样,我们只需要依赖较少的集中RDD的转换,而主要的计算逻辑任由Hive提供。

由于使用了Hive的原语,因此我们需要显示的调用一些Transformation来实现Shuffle的功能。下表中列举了Hive on Spark使用的所有转换。

Transformation                                        功能                                                         应用场景

mapPartitionsToPair                   将function应用到RDD的每个partition上           主要的计算逻辑

union                     返回两个RDD的联合                  多表查询

groupByKey        按照key对RDD进行group,group后的结果不保证有序,使用Hash Partitioner      Shuffle 使用用不要求排序的情况

sortByKey           按照key对RDD进行全局排序,使用Range Partitioner     Shuffle时使,适用于全局排序的查询,且由于使用了Range Partitioner,因此可以用多个reducer来实现全局排序

repartitionAndSortWithinPartitions   对RDD进行重新分区,并对每个分区进行排序,使用hash Partition  shuffle时使用,适用于需要排序的情况

repartitionAndSortWithinPartitions简单说明,这个功能由SPARK-2978引入,目的是提供一种MapReduce风格的Shuffle.虽然sortBySort也提供了排序的功能,但某些情况下我们并不需要全局有序,另外其使用的Range Partitioner对于某些Hive查询并不适用。

物理执行计划:

通过SparkCompiler将Operator Tree转换为Task Tree,其中需要提交给spark执行的任务即为sparkTask,不同于MapReduce中Map+Reduce的两阶段执行模式,spark采用DAG的执行模式,因此一个sparktask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。执行SparkTask时,就根据SparkWork所表示的DAG计算出最终的RDD,然后通过RDD的foreachAsync来出阿发原酸,使用foreachAsync是因为我们使用了Hive原语,因此不需要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。

SparkContext声明周期

sparkContext是用户与spark集群进行交互的接口,HIve on spark应该为每个用户会话创建一个SparkContext。但是Spark目前的使用方式假设SparkContext的生命周期是spark应用级别的,而且目前同一个JVM中不能创建多个SparkContext,这明显无法满足HiveServer2的应用场景,因为多个客户端需要通过一个HiveServer2来提供服务。鉴于此,我们需要在单独JVM中启动SparkContext,并通过RPC与远程的SparkContext进行通信。

任务监控与统计信息收集

spark提供了SparkListener接口来监听任务执行期间的各种事件,因此我们可以实现一个Listener来监控任务执行进度以及收集任务级别的统计的统计信息(目前任务级别的统计由sparkListener采集,任务进度则由spark提供专门的api来监控)另外Hive还提供了Operatior级别的统计数据信息,比如读取的行数等。在MapReduce模式下,这些信息通过Hadoop Counter收集。我们可以使用Spark提供的Accumulator来实现该功能。

测试

除了一般的单元测试以外,hive还提供了Qfile Test,即圆形一些事先定义好的查询,并根据结果判断测试是否通过。HIve on spark的Qfile Test应该尽可能接近真实的spark部署环境。目前我们采用的是local-cluster方式(该不是)。。。。

实现细节:

sparkTask的生成执行

我们通过一个例子看下一个简单的两表JOIN查询如何被转换成为sparkTask并被执行。下图左半部分展示了这个查询的operation tree,以及该operation Tree如何被转化成SparkTask;右半部分展示了该SparkTask执行时如何得到最终的RDD并通过foreachAsync提交spark任务

SparkCompiler遍历Operator Tree,将其划分为不同的MapWork和reducework,Mapwork为根节点,总是由TableScanOperator(hive中对表进行扫描操作符)开始;后续的work均为reducework。ReduceSinkOperator(Hive中进行Shuffle输出的操作符)用来标记两个Work之间的界限,出现ReduceSinkOperator表示当前work到下一个work之间的数据需要进行shuffle,因此当我们发现FileSinkOperator(hive中将结果输出到文件的操作符)的work为叶子节点。与MapReduce最大的不同在于,我们并不要求ReduceWork一定是叶子节点,即ReduceWork之后可以连接更多的reducework,并在同一个SParkTask中执行。

这个查询的OperatorTree 被转化为两个MapWork和一个ReduceWork。在执行SparkTask时,首先根据MapWork来生成最底层的HadoopRDD,然后将各个MapWork和ReduceWork包装成Function应用到RDD上,在有依赖的Work之间,需要显示的调用shuffle转换,具体选用哪种shuffle则根据查询的类型来确定。另外,由于这个例子涉及多表查询,因此在shuffle之前还要对RDD进行Union,经过一系列的转换后,得到最终的RDD,并通过foreachAsync提交到Spark集群上进行计算。

运行模式

RSC工作原理

Hive on Spark支持两种运行模式;本地和远程。当用户吧Spark Master URL设置为local时,采用本地模式;其余情况则采用远程模式。本地模式下,SparkContext与客户端运行在同一个JVM中,远程模式下,SparkContext运行在一个独立的JVM中,本地模式提供了主要为了调试,一般用户不应该选择该模式。因此我们这里主要介绍远程模式(Remote SparkContext。RSC)

用户的每个Session都会创建一个SparkClient,SparkClient会启动RemoteDriver进程,并由RemoteDriver创建SparkContext。SparkTask执行时,通过session提交任务,任务的主体就是对应的SparkWork。SparkClient将任务提交给RemoteDriver,并返回一个SparkJobRef,通过该SparkJobRef,客户端可以监控任务的进度,进行错误处理,以及采集统计信息等。由于最终的RDD计算没有返回结果,因此客户端只需要监控执行进度而不需要处理返回值。RemoteDriver通过SparkListenner收集任务级别的统计数据,通过Accumulator收集Operator级别的统计数据(Accumulator被包装为SparkCounter),并在任务结束时,返回给SparkClient。

SparkClient与RemoteDriver之间通过Netty的RPC进行通信。除了提交任务,SparkClient还提供了诸如添加jar包,获取集群信息的接口。如果客户端需要使用更一般的SparkContext的功能,可以自定义一个任务并通过SparkClient发送到Driver上执行。

理论上来说,HIve on Spark对Spark集群的部署方式没有特别的要求,除了local以外,RemoteDriver可以连接到任务的Spark集群来执行任务。在我们的测试中,HIve on Spark在Standalone和spark on yarn的计算上都能正常工作(需要动态添加jar包的查询在yarn-cluster模式下还不能运行)

优化

Map join

Map join是Hive中一个重要的优化,其原理是,如果参与join比较小的表可以存放如内存,因为这些小表在内存中生成Hash Table,这样较大的表只需要通过一个MapWork呗扫描一次,然后与内存中的Hash Table进行join了,生气了Shuffle和ReduceWork的开销。在MapReduce模式下,通过一个在客户端本地执行的任务来小表生成Hash table,并保存在文件系统上。后续的MapWork首先将Hash Table上传至Distibuted Cache中,最后只要读取大表和Distributed cache中的数据进行join就可以。

Hive on spark 对Map Join的实现与Map Reduce不同。当初我们考虑使用spark提供的广播功能来把小表的Hash table分发到各个节点上。使用广播的有点是spark采用高效的广播算法,其性能应该优于使用distributed cache。而使用广播的缺点会为Driver何计算及诶点带来很大的内存开销。为了使用广播,Hash table的数据需要先被传送到Driver端,然后由Driver进行广播;而且即使在广播之后,Driver仍需保留部分数据,以便应对计算节点的错误,虽然支持spill,但广播数据仍会加剧Driver的内存压力。此外广播相对的开发成本比较高,不利于对已有的代码的服用。

因此Hive on Spark选择类似于Distributed cache的方式来实现Map join,而且为小表生成Hash table的任务可以分布式的执行,进一步减轻客户端的压力。

不同于MapReduce,对于Hive on spark而言,LocalWork只是为了提供一些优化时必要的信息,并不会真正被执行。对于小表的扫描以独立的SparkTask分布式地执行,为此,我们也实现了能够分布式运行HashTableSinkOperator(Hive中输出小表Hash Table的操作符),其主要原理是通过提高HDFS Replication Factor的方式,是的生成的HashTable能够被每个节点在本地访问。

虽然目前采取了类似Distributed的这种实现方式,但如果再后期的测试中发现广播的方式确实能带来较大的性能提升,而且其引入的内存开销可以被接受,我们也会考虑改用广播来实现Map Join

Table cache

Spark的一个优势是可以利用充分利用内存,允许用户显示地把一个RDD保存到内存或者磁盘上,以便于在多次访问时候提高性能。另外在目前的RDD转换模式弘,一个RDD的数据是无法同时被多个下游使用,当一个RDD需要通过不同的转换得到不同的子节点时,就要被计算多次。这时我们应该使用cache来避免重复计算。

在Shark和SparkSQL中,都允许用户显式吧一张表cache来提高对该表的查询性能。比如以下查询,在

在这种情况下,对应的SparkWork中,一个MapWork/ReduceWork会有多个下游的Work,如果不进行cache,那么共享的数据就会被计算多次。为了避免这种情况,我们会将这些MapWork/ReduceWork复制成多个,每个对应一个下游的Work,并对其共享的数据进行cache(由于IOContext的同步问题,该工功能尚未完成)

更为一般的应用场景是一张表在查询中被使用了多次的情况,Hive on spark 目前还不会针对这种查询进行cache,

初步性能测试:

测试集群由10台虚拟机组成,测试数据为320GB的TPC-DS数据。目前的测试用例为6条,包含了自定义的查询以及TPC-DS数据集合。目前的测试用例为6条,包含了自定义的查询以及TPC-DS中两条查询。由于Hive主要用于处理ETL查询,因此我们在TPC-DS中选取用例时,选用较为接近ETL查询用例(TPC-DS的用例主要针对交互式查询,impala,sparkSQL等引擎更合适此类查询),主要针对Hive on Spark 和Hive on Tez进行性能对比

Hive on Spark vs. Hive on Tez

图中横坐标为各个测试用例,纵坐标为所用时间,以秒为单位。

总结:

Hive on spark由多家公司协作开发,从项目开始以来,受到社区的广泛关注。

上一篇下一篇

猜你喜欢

热点阅读