大数据组件知识点总结(3) - Spark
2018-08-22 本文已影响37人
千反田爱瑠爱好者
Spark
使用DAG计算引擎、RDD模型,基于内存执行分布式计算,适合做迭代式计算和交互式计算。
主要特点
性能高效
- 基于内存计算(RDD可内存、磁盘、混合);
- 通用DAG计算引擎(数据通过内存、磁盘流向其他单元,MR是通过HDFS);
- 高度优化、重新设计。
简单易用
- 支持多种语言:Java、Scala、Python、R;
- 提供多种高层次API,代码量大幅减少。
与Hadoop集成
YARN、HDFS、HBase
编程模型
RDD
弹性(内存、磁盘、混合)分布式(Partition)数据集,只读(确保数据一致性),可并行执行转换运算,支持失败后基于血统关系重构。
读写内存和磁盘比HDFS的分布式写快很多。
RDD组成:
- partition(分区的多少涉及对RDD计算的并行度,数量对应分配到的CPU核心数或HDFS的块数)
- partition计算函数
- 依赖RDD(父RDD)
- 节点位置
操作:
- 转换:RDD本身是不可变的,可通过转换操作把RDD转换成另一个RDD,惰性执行;
- 行动:处理RDD得出结果,结果可持久化到内存或磁盘。
共享变量:
- 累加器
- 广播变量
基本框架
一个Driver进程和多个Executor进程构成:
- Driver运行用户程序、生成任务计划阶段后分配给Executor执行;
- Executor进程是拥有独立计算单元的JVM,可多线程执行任务。
运行模式
- local:本地执行,Driver、Executor都运行在本地,方便调试;
- standalone:由一个master和多个slave服务组成,包括client和cluster模式;
- YARN:利用YARN做资源管理和调度
- yarn-client:Driver运行在客户端,不受YARN管理和控制(只适合用于测试,由于需要与集群通信,所以会造成网卡流量激增);
- yarn-cluster:Driver和Executor均运行在YARN NodeManager 的Container中,由YARN统一管理控制(调试不便,spark-submit后看不到log)。
作业生命周期
- 生成逻辑计划:通过RDD之间的依赖关系,构造DAG;
- 生成物理计划:根据前一阶段生成的DAG划分Stage(若干个并行计算任务);
- 调度并执行任务:调度计算每个Stage(多个Executor同时计算)。
内核架构
作业提交、运行
组织关系
Mac 1:Submit
spark-submit Jar
Driver(进程)
SparkContext
DAGScheduler
TaskScheduler
Mac 2:Master
Mac 3:Worker
Executor(进程)
ThreadPool
TaskRunner
处理流程
Application Jar // 编写Spark应用程序
|
spark-submit // 提交作业
|
Driver // 执行应用程序(初始化SparkContext(SparkConf))
|
DAGScheduler、TaskScheduler(通过后台进程连接Master、注册Application)
|
Master // 接收Application请求,通过资源调度算法,在Worker上为Application启动多个Executor
|
Executor // 启动后反向注册到TaskScheduler
|
DAGScheduler // 初始化结束,开始执行代码,每执行一个Action就创建一个Job,提交给DAGScheduler,划分Stage(Stage划分算法)
|
TaskScheduler // Stage创建一个TaskSet,提交给TaskScheduler
|
Executor // Task提交到Executor执行(Task分配算法)
// TaskRunner封装Task,线程池中取出一个线程来执行(从代码拷贝、反序列化执行)
|
(Stage不断分批次创建Task、提交到Executor执行,每个Task对应RDD一个Partition。)
Task:包括ShuffleMapTask和ResultTask(最后一个)
宽依赖、窄依赖
窄依赖(Narrow Dependency)
- 父RDD与子RDD的partition是一一对应的,每个父RDD的Partition都只会传到子RDD的一个Partition;
- 窄依赖都分在同一个Stage-Task中。
lines RDD:
partition_1 = "hello world"
partition_2 = "hello me"
partition_3 = "hello you"
=> val words = lines.flatMap(line => line.split(" "))
words RDD:
partition_1 = ["hello", "you"] // 只依赖于父RDD的partition_1
partition_2 = ["hello", "me"]
partition_3 = ["hello", "world"]
=> val pairs = lines.map(word => (word, 1))
pairs RDD:
partition_1 = [("hello", 1), ("you", 1)]
partition_2 = [("hello", 1), ("me", 1)]
partition_3 = [("hello", 1), ("world", 1)]
宽依赖(Shuffle Dependency)
- 每一个父RDD的Partition都可能会传到子RDD的多个Partition中;
- 发生宽依赖即划分新的Stage,提交新的一批Task到executor。
pairs RDD:
partition_1 = [("hello", 1), ("you", 1)]
partition_2 = [("hello", 1), ("me", 1)]
partition_3 = [("hello", 1), ("world", 1)]
=> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts RDD:
partition_1 = ("hello", 3) // 依赖于父RDD的partition_1、partition_2、partition_3
partition_2 = ("me", 5)
partition_3 = ("world", 10)
SparkContext
SparkContext
createTaskScheduler() ->
// TaskScheduler,创建SchedulePool
TaskSchedulerImpl
// 调用init创建SchedulePool
// 调用start
// 负责接收TaskSchedulerImpl的控制,负责与Master的注册、Executor的反注册、Task发送到Executor等操作
SparkDeploySchedulerBackend
// 由TaskScheduler创建有不同的优先策略,如FIFO
SchedulePool