Spark原理图
- 为什么使用广播变量
- 因为一个变量在Driver端定义,如果执行计算需要传递到executor的task线程中获取变量副本依次拉取执行
- 此时造成由于变量数据量和任务的个数急剧增长的情况下,造成网络传输的瓶颈
- 引出:广播变量,属于共享变量,就是将一个变量广播到executor让所有的task共享该变量
- 什么是广播变量
- 将Driver端的变量传递到Executor端,所有task共享这一份变量
- 广播变量如果没有会怎么样?
- 会造成大量网络数据传输
- 广播变量如何使用?
- sc.broadcast()
- broadcast.value()
回顾Hive中自定义函数有三种类型:
l ** 第一种:UDF(User-Defined-Function) 函数**
n 一对一的关系,输入一个值经过函数以后输出一个值;
n 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
l ** 第二种:UDAF(User-Defined Aggregation Function) 聚合函数**
n 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
l 第三种:UDTF(User-Defined Table-Generating Functions) 函数
n 一对多的关系,输入一个值输出多个值(一行变为多行);
n 用户自定义生成函数,有点像flatMap;
目前来说Spark 框架各个版本及各种语言对自定义函数的支持:
在SparkSQL中,目前仅仅支持UDF函数和UDAF函数:
l UDF函数:一对一关系;
l UDAF函数:聚合函数,通常与group by 分组函数连用,多对一关系;
由于SparkSQL数据分析有两种方式:DSL编程和SQL编程,所以定义UDF函数也有两种方式,不同方式可以在不同分析中使用。
Spark中driver的主要工作是什么?
-
在spark on yarn模式下
是由application master负责executor申请,
driver负责job 和stage的划分以及task的创建分配和调度. -
standolone模式下是由Driver 都负责
Driver
1 - 是一个进程, 我们编写的一个Spark应用程序就运行在Driver上,由Driver进程执行;
2 - 接收executor启动后的反向注册
3 - Driver开始执行main函数,之后执行到Action算子时,开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
client 模式
图片2.pngcluster 模式
图片3.pngstandalone集群有四个重要组成部分.
- Driver
- 是一个进程, 我们编写的一个Spark应用程序就运行在Driver上,由Driver进程执行;
- Master
- 是一个进程, 主要负责资源调度和分配,并进行集群监控等职责
- Worker
-
是一个进程, 一个worker运行在集群中的一台服务器上,主要负责两个职责,
-
一个是用自己的内存存储RDD的某个或者某些Partition,
-
另一个是启动其他进程和线程(Executor), 对RDD的Partition进行并行的处理和计算.
- Executor
- 是一个进程, 一个worker上可以运行多个Executor,Executor通过启动多个线程(task) 来执行对RDD的Partition进行并行计算,也就是执行MAP. FLATMAP,等算子操作.
Standalone Client 模式:
-
Driver在任务提交的本地机器上运行,
-
Driver 启动后向Master注册应用程序,
-
Master根据submit脚本的资源需求找到内部的资源至少可以启动一个executor的所有worker,
-
然后在这些worker之间分配executor,worker上的executor启动后会向driver反向注册,所有executor注册完成后,
-
driver开始执行main函数,之后执行ACTION的算子, 开始划分STAGE,每个stage生成对应的taskset, 之后将task分发到各个executor上执行.
Standalone Cluser 模式:
-
在Standalone Cluster模式下,任务提交后,Master会找到一个Worker启动Driver进-程,Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少可以启动一个Executor的所有Worker,然后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的Executor注册完成后,Driver开始执行main函数,之后执行到Action算子时,开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
-
注意,Standalone的两种模式下(client/Cluster),Master在接到Driver注册Spark应用程序的请求后,会获取其所管理的剩余资源能够启动一个Executor的所有Worker,然后在这些Worker之间分发Executor,此时的分发只考虑Worker上的资源是否足够使用,直到当前应用程序所需的所有Executor都分配完毕,Executor反向注册完毕后,Driver开始执行main程序。
请详述Repartition和Coalesce关系与区别
Repartition: 重新分区消耗比较昂贵的算子.
spark出了一个优化版本Coalesce,可以尽量避免数据迁移,
Coalesce使用已有的partition去尽量减少数据shuffer
Repartition创建新的partition并且使用full shuffer
数据分布上:
Coalesce会使每个partition数量不同, Repartition会使数据分布均匀相等(数据量的情况下)
Coalesce不会启用shuffer 不能提高分区只能降低,而且指定多了只会有一个分区在跑数据.
分别简述Spark中的缓存机制(cache和persist)与checkpoint机制,并指出两者的区别与联系,以及Spark如何实现容错机制?
都是缓存级别
cache()调用的persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
//cache只有一个默认的缓存级别MEMORY_ONLY
def cache(): this.type = persist()
persist内存
内存
磁盘
副本
序列化
堆外内存