Spark从入门到精通58:Spark 2.x与1.x对比以及分
Spark 2.x与1.x对比
Spark 1.x:Spark Core(RDD)、Spark SQL(SQL+Dataframe+Dataset)、Spark Streaming、Spark MLlib、Spark Graphx
Spark 2.x:Spark Core(RDD)、Spark SQL(ANSI-SQL+Subquery+Dataframe/Dataset)、Spark Streaming、Structured Streaming、Spark MLlib(Dataframe/Dataset)、Spark Graphx、Second Generation Tungsten Engine(Whole-stage code generation+Vectorization)
2.Spark 2.x各组件分析Spark Core(RDD)
从Spark诞生之日开始,RDD就是Spark最主要的编程接口,重要程度类似于Hadoop中的MapReduce。RDD,简单来说,就是一个不可变的分布式数据集,被分为多个partition从而在一个集群上分布式地存储。我们可以使用RDD提供的各种transformation和action算子,对RDD执行分布式的计算操作。
可能很多人会问,Spark 2.0开始,包括Structured Streaming、Spark MLlib、Spark SQL底层都开始基于Dataframe/Dataset来作为基础计算引擎,那么Spark Core/RDD是不是就要被淘汰了?
回答是:错误!
Spark官方社区对于这个问题也是这个态度,Spark Core绝对不会被淘汰掉。因为Spark Core/RDD作为一种low-level的API有它的较为底层的应用场景,虽然后续这种场景会越来越少,Dataframe/Dataset API会逐渐替代原先Spark Core的一些场景,但是不可否认的是,这种场景还是存在的。此外,Dataframe/Dataset实际上底层也是基于Spark Core/RDD构建的。所以说,Spark Core/RDD是Spark生态中,不可替代的基础API和引擎,其他所有的组件几乎都是构建在它之上。未来它不会被淘汰,只是应用场景会减少而已。
Spark 2.x中,在离线批处理计算中,编程API,除了RDD以外,还增强了Dataframe/Dataset API。那么,我们到底什么时候应该使用Spark Core/RDD来进行编程呢?实际上,RDD和Dataset最大的不同在于,RDD是底层的API和内核,Dataset实际上基于底层的引擎构建的high-level的计算引擎。
1、如果我们需要对数据集进行非常底层的掌控和操作,比如说,手动管理RDD的分区,或者根据RDD的运行逻辑来结合各种参数和编程来进行较为底层的调优。因为实际上Dataframe/Dataset底层会基于whole-stage code generation技术自动生成很多代码,那么就意味着,当我们在进行线上报错的troubleshooting以及性能调优时,对Spark的掌控能力就会降低。而使用Spark Core/RDD,因为其运行完全遵循其源码,因此我们完全可以在透彻阅读Spark Core源码的基础之上,对其进行troubleshooting和底层调优。(最重要的一点)
2、我们要处理的数据是非结构化的,比如说多媒体数据,或者是普通文本数据。
3、我们想要使用过程式编程风格来处理数据,而不想使用domain-specific language的编程风格来处理数据。
4、我们不关心数据的schema,即元数据。
5、我们不需要Dataframe/Dataset底层基于的第二代tungsten引擎提供的whole-stage code generation等性能优化技术。
3.Spark 2.x各组件分析Spark SQL(ANSI-SQL+Subquery)
Spark 2.x中的Spark SQL,提供了标准化SQL的支持,以及子查询的支持,大幅度提升了Spark在SQL领域的应用场景。而且本身在大数据领域中,SQL就是一个最广泛使用的用户入口,据不完全统计以及讲师的行业经验,做大数据的公司里,90%的应用场景都是基于SQL的。最典型的例子就是Hadoop,几乎用Hadoop的公司,90%都是基于Hive进行各种大数据的统计和分析。剩下10%是实时计算、机器学习、图计算。之所以有这种现象,主要就是因为SQL简单、易学、易用、直观。无论是研发人员,还是产品经理,还是运营人员,还是其他的人,都能在几天之内入门和学会SQL的使用,然后就可以基于大数据SQL引擎(比如Hive)基于企业积累的海量数据,根据自己的需求进行各种统计和分析。
此外,据Spark官方社区所说,Spark 2.x一方面对SQL的支持做了大幅度的增强,另一方面,也通过优化了底层的计算引擎(第二代tungsten引擎,whole-stage code generation等),提升了SQL的执行性能以及稳定性。
所以在Spark 2.x中,一方面,开始鼓励大家多使用Spark SQL的SQL支持,采用Spark SQL来编写SQL进行最常见的大数据统计分析。比如可以尝试将Hive中的运行的一些SQL语句慢慢迁移到Spark SQL上来。另外一方面,也提醒大家,一般一个新的大版本,都是不太稳定的,因此Spark SQL虽然在功能、性能和稳定性上做了很多的增强,但是难免还是会有很多的坑。因此建议大家在做Hive/RDBMS(比如Oracle)到Spark SQL的迁移时,要小心谨慎,一点点迁移,同时做好踩坑的准备。
4.Spark 2.x各组件分析SparkSQL(Dataframe/Dataset)
就像RDD一样,Dataframe也代表一个不可变的分布式数据集。与RDD不同的一点是,Dataframe引入了schema的概念,支持以复杂的类型作为元素类型,同时指定schema,比如Row。因此Dataframe更像是传统关系型数据库中的表的概念。为了提升开发人员对大数据的处理能力,Dataframe除了提供schema的引入,还基于Schema提供了很多RDD所不具备的high-level API,以及一些domain-specific language(特定领域编程语言)。但是在Spark 2.0中,Dataframe和Dataset合并了,Dataframe已经不是一个单独的概念了,目前仅仅只是Dataset[Row]的一个类型别名而已,你可以理解为Dataframe就是Dataset。
从Spark 2.0开始,Dataset有两种表现形式:typed API和untyped API。我们可以认为,Dataframe就是Dataset[Row]的别名,Row就是一个untyped类型的对象,因为Row是类似于数据库中的一行,我们只知道里面有哪些列,但是有些列即使不存在,我们也可以这对这些不存在的列进行操作。因此其被定义为untyped,就是弱类型。
而Dataset[T]本身,是一种typed类型的API,其中的Object通常都是我们自己自定义的typed类型的对象,因为对象是我们自己定义的,所以包括字段命名以及字段类型都是强类型的。目前Scala支持Dataset和Dataframe两种类型,Java仅仅支持Dataset类型,Python和R因为不具备compile-time type-safety特性,因此仅仅支持Dataframe。
Dataset API的优点
1、静态类型以及运行时的类型安全性
SQL语言具有最不严格的限制,而Dataset具有最严格的限制。SQL语言在只有在运行时才能发现一些错误,比如类型错误,但是由于Dataframe/Dataset目前都是要求类型指定的(静态类型),因此在编译时就可以发现类型错误,并提供运行时的类型安全。比如说,如果我们调用了一个不属于Dataframe的API,编译时就会报错。但是如果你使用了一个不存在的列,那么也只能到运行时才能发现了。而最严格的就是Dataset了,因为Dataset是完全基于typed API来设计的,类型都是严格而且强类型的,因此如果你使用了错误的类型,或者对不存在的列进行了操作,都能在编译时就发现。
2、将半结构化的数据转换为typed自定义类型
举例来说,如果我们现在有一份包含了学校中所有学生的信息,是以JSON字符串格式定义的,比如:{“name”: “leo”, “age”, 19, “classNo”: 1}。我们可以自己定义一个类型,比如case class Student(name: String, age: Integer, classNo: Integer)。接着我们就可以加载指定的json文件,并将其转换为typed类型的Dataset[Student],比如val ds = spark.read.json("students.json").as[Student]。
在这里,Spark会执行三个操作:
1、Spark首先会读取json文件,并且自动推断其schema,然后根据schema创建一个Dataframe。
2、在这里,会创建一个Dataframe=Dataset[Row],使用Row来存放你的数据,因为此时还不知道具体确切的类型。
3、接着将Dataframe转换为Dataset[Student],因为此时已经知道具体的类型是Student了。
这样,我们就可以将半结构化的数据,转换为自定义的typed结构化强类型数据集。并基于此,得到之前说的编译时和运行时的类型安全保障。
3、API的易用性
Dataframe/Dataset引入了很多的high-level API,并提供了domain-specific language风格的编程接口。这样的话,大部分的计算操作,都可以通过Dataset的high-level API来完成。通过typed类型的Dataset,我们可以轻松地执行agg、select、sum、avg、map、filter、groupBy等操作。使用domain-specific language也能够轻松地实现很多计算操作,比如类似RDD算子风格的map()、filter()等。
4、性能
除了上述的优点,Dataframe/Dataset在性能上也有很大的提升。首先,Dataframe/Dataset是构建在Spark SQL引擎之上的,它会根据你执行的操作,使用Spark SQL引擎的Catalyst来生成优化后的逻辑执行计划和物理执行计划,可以大幅度节省内存或磁盘的空间占用的开销(相对于RDD来说,Dataframe/Dataset的空间开销仅为1/3~1/4),也能提升计算的性能。其次,Spark 2.x还引入第二代Tungsten引擎,底层还会使用whole-stage code generation、vectorization等技术来优化性能。
什么时候应该使用Dataframe/Dataset,而不是RDD呢?
1、如果需要更加丰富的计算语义,high-level的抽象语义,以及domain-specific API。
2、如果计算逻辑需要high-level的expression、filter、map、aggregation、average、sum、SQL、列式存储、lambda表达式等语义,来处理半结构化,或结构化的数据。
3、如果需要高度的编译时以及运行时的类型安全保障。
4、如果想要通过Spark SQL的Catalyst和Spark 2.x的第二代Tungsten引擎来提升性能。
5、如果想要通过统一的API来进行离线、流式、机器学习等计算操作。
6、如果是R或Python的用户,那么只能使用Dataframe。
最后,实际上,Spark官方社区对RDD和Dataframe/Dataset的建议时,按照各自的特点,根据的需求场景,来灵活的选择最合适的引擎。甚至说,在一个Spark应用中,也可以将两者结合起来一起使用。
5.Spark 2.x各组件分析 Spark Streaming&Structured Streaming
Spark Streaming是老牌的Spark流式计算引擎,底层基于RDD计算引擎。除了类似RDD风格的计算API以外,也提供了更多的流式计算语义,比如window、updateStateByKey、transform等。同时对于流式计算中重要的数据一致性、容错性等也有一定的支持。
但是Spark 2.x中也推出了全新的基于Dataframe/Dataset的Structured Streaming流式计算引擎。相较于Spark Streaming来说,其最大的不同之处在于,采用了全新的逻辑模型,提出了real-time incremental table的概念,更加统一了流式计算和离线计算的概念,减轻了用户开发的负担。同时还提供了(可能在未来提供)高度封装的特性,比如双流的全量join、与离线数据进行join的语义支持、内置的自动化容错机制、内置的自动化的一次且仅一次的强一致性语义、time-based processing、延迟数据达到的自动处理、与第三方外部存储进行整合的sink概念,等等高级特性。大幅度降低了流式计算应用的开发成本。
这里要提的一句是,首先,目前暂时建议使用Spark Streaming,因为Spark Streaming基于RDD,而且经过过个版本的考验,已经趋向于稳定。对于Structured Streaming来说,一定要强调,在Spark 2.0版本刚推出的时候,千万别在生产环境使用,因为目前官方定义为beta版,就是测试版,里面可能有很多的bug和问题,而且上述的各种功能还不完全,很多功能还没有。因此Structured Streaming的设计理念虽然非常好,但是个人建议在后续的版本中再考虑使用。目前可以保持关注和学习,并做一些实验即可。