大数据,机器学习,人工智能玩转大数据大数据

Spark

2020-05-03  本文已影响0人  ZzzZBbbB

参考链接:https://github.com/wangzhiwubigdata/God-Of-BigData/blob/master/%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%A1%86%E6%9E%B6%E5%AD%A6%E4%B9%A0/Spark%E7%AE%80%E4%BB%8B.md

Spark 概述

Support working sets (of data) through RDD

RDD Resilient Distributed Dataset 弹性分布式数据集

源码显示:
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging

1)RDD是一个抽象类
2)泛型的,可以支持多种类型: String、Person、User

RDD represents an immutablepartitioned collection of elements that can be operated on in parallel
简单来说, RDD是Spark最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作。

Propeties of RDD

Internally, each RDD is characterized by 5 main properties:

另外后续会提到关于分区:
one task on per partition --- 对于每一个分区,其都有一个task去处理
one partition could be persisted --- 对于每一个分区,其都可以被持久化

源码体现特性:

def getPartitions: Array[Partition]  特性一
def compute(split: Partition, context: TaskContext): Iterator[T] 特性二
def getDependencies: Seq[Dependency[_]] = deps  特性三
val partitioner: Option[Partitioner] = None  特性四
def getPreferredLocations(split: Partition): Seq[String] = Nil  特性五

RDD 创建方式

1.Parallelized Collections 已有的集合创建

命令行运行
到 sprak的 bin目录下 pyspark开启shell操作   
>>> sc # 命令行运行中系统会自动创建一个SparkContext 名称为sc
<SparkContext master=local[*] appName=PySparkShell>
>>> sc.getConf() # sc对应的SparkConf
<pyspark.conf.SparkConf object at 0x7f26237eeba8>
>>> data = [1,2,3,4,5]
>>> dataRDD = sc.parallelize(data)
>>> dataRDD.collect()
[1, 2, 3, 4, 5]
>>> dataRDD2 = sc.parallelize(data,5) # 把data切成五份 5 个partition 
# 运行一个partition有1个task,言下之意对应有5个partition的data,有5个task来执行
>>> dataRDD2.collect()
[1, 2, 3, 4, 5]

WebUI上查看(4040端口)信息:http://hadoop000:4040

image.png

2.External Datasets 外部存储资源读取,例如本地文件系统,HDFS,HBase 或支持 Hadoop InputFormat 的任何数据源。

#本地上 读本地文件 输入的参数路径格式: file://xxx
sc.textFile('file:///home/hadoop/data/hello.txt').collect()
如果是本地操作的话,文件必须要能够被这个工作的节点访问得到。
支持目录路径,支持压缩文件,支持使用通配符
textFile(dir)   textFile(*.txt)    textFile(*.gz) 
textFile:其返回格式是 RDD[String] ,返回的是就是文件内容,RDD 中每一个元素对应一行数据

#hadoop集群上读取   输入的参数路径格式:hdfs://xxx
>>> sc.textFile("hdfs://hadoop000:8020/test/hello.txt").collect()

#还可以使用wholeTextFiles进行读取
>>> sc.wholeTextFiles('file:///home/hadoop/data/hello.txt').collect()
[('file:/home/hadoop/data/hello.txt', 'hello spark\nhello pyspark\nhello sparksql\n')] 
返回一个list list中每一个元素是一个tuple,第一个是全路径,第二个是内容
wholeTextFiles:其返回格式是 RDD[(String, String)],元组中第一个参数是文件路径,第二个参数是文件内容

从HDFS上读取文件时,Spark默认每个块设置为一个分区

RDD 算子

  1. transformations
    由于RDD是不可变的集合,所以需要通过转换从现有数据集创建新数据集
常用操作
1.map(func): 将func函数作用到数据集的每一个元素上,生成一个新的分布式的数据集返回 
2.filter(func):选出所有func返回值为true的元素,生成一个新的分布式的数据集返回  
3.flatMap(func) 输入的item能够被map到0或者多个items输出,返回值是一个Sequence 
4.groupByKey():把相同的key的数据分发到一起 返回的每一个item (key, ResultIterable) 
5.reduceByKey(): 把相同的key的数据分发到一起并进行相应的计算
6.union
7.distinct
8.join 默认为innerjoin,可以使用leftjoin rightjoin, fulljoin


一个简单计算wordcount的写法
sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(lambda line: line.split("\t")).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).collect()
  1. actions
    action操作将数据集上运计算后将得到的值返回到driver program或者写到external storage中去
常见操作
1. collect() 输出 类似于print
2. take(n) 取前几个
3. max() min() sum() count()
4. reduce(func)
5. saveAsTextFile()
6. foreach() 对每一个元素进行操作
 

Spark的使用

from pyspark import SparkConf,SparkContext

# 1.创建SparkConf:设置Spark相关的参数信息
conf = SparkConf().setMaster("local[2]").setAppName("PySpark")

# 2.创建SparkContext
sc = SparkContext(conf=conf)

## 3.业务逻辑 RDD创建和操作
"""
xxxxxxxxx
"""
# 4.清除上下文
sc.stop()

Spark运行模式

提交作业
./bin/spark-submit            --master local[2]                    -- master url
                              --name spark-local                   -- app name
                              /home/hadoop/script/test.py          -- py file
                              file:///home/hadoop/data/hello.txt   -- 参数
                              file:///home/hadoop/wc/output

使用场景:一般是取小部分数据 通过 spark-submit --master local[*] xx.py [args]在本地跑一跑看看效果

standalone.png

local模式不需要对spark进行任何配置,但是standalone需要进行相应的配置,standalone模式下有worker和master进程

1.配置slaves
$SPARK_HOME/conf/slaves
hadoop000
hadoop001 ...

2.启动spark集群
$SPARK_HOME/sbin/start-all.sh
ps: 建议在spark-env.sh中添加JAVA_HOME,否则有可能报错
检测: jps 查看master和worker进程是否启动成功

提交作业
./bin/spark-submit            --master spark://hadoop000:7077        -- master url
                              --name spark-standalone                -- app name
                              /home/hadoop/script/test.py          -- py file
                              hdfs://hadoop000:8020/wc.txt    -- 参数
                              hdfs://hadoop000:8020/wc/output

注意:使用standalone模式,而且节点个数大于1的时候,
使用本地文件测试,必须要保证每个节点上都有本地测试文件

spark://hadoop000:7077 表示spark提交作业的端口
WebUI地址端口: 8080
WebUI.png
端口的问题.png

yarn 和 standalone的比较:
https://www.runexception.com/q/3546
yarn: 只需要一个节点,然后提交作业即可,不需要spark集群的(不需要启动master和worker),spark 做为一个应用程序(application) 的方式运行在 yarn 上
standalone:spark集群上每个节点都需要部署spark,需要启动spark集群(需要master和worker)

提交作业
./spark-submit         --master yarn 
                       --name spark-yarn         
                       /home/hadoop/script/test.py 
                      hdfs://hadoop000:8020/wc.txt 
                      hdfs://hadoop000:8020/wc/output

Exception in thread "main" java.lang.Exception:When running with master 
'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment
 spark 想要跑在yarn 上必须要知道HDFS 和 yarn 的信息,不然 spark无法找到yarn 

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which 
contains the (client side) configuration files for the Hadoop cluster. 
These configs are used to write to HDFS and connect to the YARN ResourceManager.
The configuration contained in this directory will be distributed to the YARN cluster 
so that all containers used by the application use the same configuration.

部署模式 deploy-mode: client cluster: 决定driver启动的位置(driver运行在哪)
client: driver运行在本地,提交作业的进程是不能停止,否则作业就挂了
cluster:提交完作业,那么提交作业端就可以断开了,因为driver是运行在application master中
注意:Cluster deploy mode is not applicable to Spark shells cluster的部署模式不能在交互式的pyspark shell中实现

Spark 架构

Glossary.png

Application:基于Spark的应用程序 = 1 driver + executors
Driver:Application的 main() 方法的进程,并创建 SparkContext
Cluster Manager:集群资源管理 例如 spark-submit **--master local[2]** / **spark://hadoop000:7077/yarn** 这些就是集群的管理
Deploy mode:区分driver运行在哪,cluster模式,运行在集群里面,client模式,运行在集群外面(本地)
Worker node:执行计算任务的工作节点(机器)
Executor:位于工作节点上的进程,负责执行计算任务并且将输出数据保存到内存或者磁盘中
Task:Executor 中的工作单元 driver端发起,发送到worker node上的executor上去执行
Job:并行计算,由多个task构成,一个action对应一个job
Stage:一个stage的边界往往是从某个地方取数据开始,到shuffle的结束

每个Application都有自己的一系列executors的进程,这样的话保证了不同的Applications间的隔离,但同时也导致了不同Applications间数据的不可共享性,如果要共享的话,只能写到外部文件,然后再去访问


architecture.png

Spark 缓存

cache transformation一样,采用lazy execution :没有遇到action是不会提交作业到spark上运行

rdd.cache()
底层使用persist方法:
rdd.cache() = rdd.persist(StorageLevel.MEMORY_ONLY)
如果一个RDD在后续的计算中可能会被使用到,那么建议cache
最底层其实调用的是StorageLevel类

Spark automatically monitors cache usage on each node 
and drops out old data partitions in a least-recently-used (LRU) fashion. 

# 手动删除cache
rdd.unpersist()    # unpersist是一个立即执行的操作,不是lazy execution 
缓存级别 .png

Spark Lineage 机制

Lineage: RDD间的依赖关系

有n个stage对应了n+1个shuffle


stage和shuffle.png

Spark shuffle

xxx

Spark 优化

上一篇 下一篇

猜你喜欢

热点阅读