sparkstream

2021-06-08  本文已影响0人  明明德撩码
image.png

Kafka 分布式的单位是 Partition。如何保证消息有序,需要分几个情况讨论。

什么是数据倾斜

对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。数据倾斜指的是,并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈(木桶效应)。

数据倾斜是如何造成的

在 Spark 中,同一个 Stage 的不同 Partition 可以并行处理,而具有依赖关系的不同 Stage 之间是串行处理的。假设某个 Spark Job 分为 Stage 0和 Stage 1两个 Stage,且 Stage 1依赖于 Stage 0,那 Stage 0完全处理结束之前不会处理Stage 1。而 Stage 0可能包含 N 个 Task,这 N 个 Task 可以并行进行

1.具体解决方案

调整并行度分散同一个 Task 的不同 Key: Spark 在做 Shuffle 时,默认使用 HashPartitioner(非 Hash Shuffle ???)对数据进行分区。

2. 自定义Partitioner:

使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task,可以拿上图继续想象一下,通过自定义 Partitioner 可以把原本分到 Task0 的 Key 分到 Task1,那么 Task0 的要处理的数据量就少了。

3. 将 Reduce side(侧) Join 转变为 Map side(侧) Join:

通过 Spark 的 Broadcast 机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除 Shuffle 带来的数据倾斜。

4. 为 skew 的 key 增加随机前/后缀:

为数据量特别大的 Key 增加随机前/后缀,使得原来 Key 相同的数据变为 Key 不相同的数据,从而使倾斜的数据集分散到不同的 Task 中,彻底解决数据倾斜问题.

Spark 的 shuffle 过程

Spark shuffle 处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。

Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

在我们的开发过程中,能避免则尽可能避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业,可以大大减少性能开销。

6Spark为什么快和Hive比较

结论 Spark 快不是绝对的,但是绝大多数,Spark 都比 Hadoop 计算要快。这主要得益于其对 mapreduce 操作的优化以及对 JVM 使用的优化。

Spark Streaming容错体系

Spark操作的数据一般存储在有容错功能的文件系统(比如HDFS、S3)上,从这些系统上的数据生成的RDD也具有容错能力,但是这个不适用于Spark Streaming。为了达到相同的容错能力,通过网络接收到的数据还被复制到其他节点上(默认复制1份,总共2份数据),这就导致错误发生时有两类数据需要恢复

结果输出容错

结果输出(saveAsTextFiles、foreachRDD等)操作本身提供至少一次级别的容错性能,就是说可能输出多次至外部系统,但可能通过一些辅助手段来实现精准一次的容错效果。
当输出为文件时是可以接受的,因为重复的数据会覆盖前面的数据,结果一致,效果相当于精确一次,其他场景下的输出要想实现精确一次的容错,需要一些额外的操作,有如下两种方法。

检查点

由于流式计算7×24小时运行的特点,除了考虑具备容错能力,我们还要考虑容错的代价问题。为了避免错误恢复的代价与运行时间成正比增长,Spark提供了检查点功能,用户定期记录中间状态,避免从头开始计算的漫长恢复
有一情况下必须启用检查点功能,那就是调用了有状态的Transformation操作,比如updateStateByKey或reduceByKeyAndWindow。因为有状态的操作是从程序开始时一直进行的,如果不做检查点,那么计算链接会随着时间一直增长,重新计算的代价也将会是天文数字。

Spark SQL

而且Spark SQL与Apache Hive基本完全兼容,我们可以像使用Hive一样来使用Spark SQL。
使用Spark SQL有两种方式

支持的数据类型

Spark SQL和DataFrame支持大部分Hive类型,具体如下所示。
• 数字类型。ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType和DecimalType。
• 字符串类型。StringType。
• 二进制类型。BinaryType。
• Bool类型。BooleanType。
• 日期时间类型。TimestampType和DateType。
• 复杂类型。ArrayType、MapType和StructType。

DataFrame

Spark SQL除了支持使用SQL方式来查询之外,还提供了编程接口,可以在Spark程序中引用Spark SQL模块。编程时使用的数据抽象是DataFrame,它具有与RDD类似的分布式数据集特点,但还增加了列的概念,这样可以与传统关系型数据库的表对应起来。Spark程序支持的Scala、Java、Python以及R这4种编程语言都可以使用DataFrame。
在Spark中使用DataFrame的过程也很简单,大概需要如下4步:
(1) 初始化环境,一般是创建一个SQLContext对象;
(2) 创建一个DataFrame,可以来源于RDD或其他数据源;
(3) 调用DataFrame操作,是一种领域特定的API,可以实现所有的SQL功能;
(4) 或者通过函数直接执行SQL语句。

DataFrame数据源

DataFrame支持非常多类型的数据源,包括Hive、Avro、Parquet、ORC、JSON、JDBC。而且Spark提供了统一的读写接口。

Catalyst执行优化器

Catalyst是Spark SQL执行优化器的代号,所有Spark SQL语句最终都通过它来解析、优化,最终生成可以执行的Java字节码。因此,Catalyst是Spark SQL最核心的部分。

上一篇 下一篇

猜你喜欢

热点阅读