玩转大数据大数据学习一

(十五)大数据学习之Spark

2019-11-23  本文已影响0人  Movle

Spark

一.Spark生态圈:

(1)Spark Core : RDD(弹性分布式数据集)
(2)Spark SQL
(3)Spark Streaming
(4)Spark MLLib:协同过滤,ALS,逻辑回归等等 --> 机器学习
(5)Spark Graphx : 图计算

二.什么是Spark

1.Spark是什么:
Spark是一个针对大规模数据处理的快速通用引擎。

2.特点:

3.最大特点:基于内存

三.Spark体系架构

1.Spark集群的体系架构图解:

image.png

2.Spark的主从结构

image.png

四.Spark的安装部署

1.Spark的安装部署方式有以下几种模式:
(1)Standalone: 本机调试

(2)YARN
(3)Mesos
(4)Amazon EC2
2.执行过程:
一个Worker有多个 Executor。 Executor是任务的执行者,按阶段(stage)划分任务。————> RDD

五.Spark的搭建:

1.准备工作

2.伪分布式模式安装:
(1)下载
(2)上传到linux
(3)解压
(4)修改配置文件

cd /opt/module

mv spark-2.1.0-bin-hadoop2.7/ spark/       //重命名spark文件夹

cd /opt/module/spark/conf 

mv spark-env.sh.template spark-env.sh    //重命名配置文件

vi spark-env.sh

修改内容如下:

export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121      //主节点的服务器名
export SPARK_MASTER_PORT=7077           //主节点端口号
//下面的可以不写,默认
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
image.png
mv slaves.template slaves

vi slaves

新增内容:

bigdata121
image.png

(5)启动:

cd /opt/module/spark

sbin/start-all.sh
image.png

(6)验证:192.168.127.121:8080

image.png

3.全分布的安装部署:
(1)下载
(2)上传到linux
(3)解压
(4)修改配置文件

cd /opt/module

mv spark-2.1.0-bin-hadoop2.7/ spark/       //重命名spark文件夹

cd /opt/module/spark/conf 

mv spark-env.sh.template spark-env.sh    //重命名配置文件

vi spark-env.sh

修改内容如下:

export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121      //主节点的服务器名
export SPARK_MASTER_PORT=7077           //主节点端口号
//下面的可以不写,默认
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
image.png
mv slaves.template slaves

vi slaves

新增内容:

bigdata122
bigdata123
image.png

(5)拷贝到其他两台服务器

cd /opt/module


src -r spark/ bigdata122:/opt/module

src -r spark/ bigdata123:/opt/module
  

(6)启动Spark集群:

cd /opt/module/spark

sbin/start-all.sh

六.Spark的HA

1.回顾HA:
(1)HDFS,Yarn,Hbase,Spark:都是主从结构
(2)单点故障
2.基于文件系统的单点恢复
(1)主要用于开发或测试环境。当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息。
(2)基于文件系统的单点恢复,主要是在spark-en.sh里对SPARK_DAEMON_JAVA_OPTS设置

配置参数 参考值
spark.deploy.recoveryMode 设置为FILESYSTEM开启单点恢复功能,默认值:NONE
spark.deploy.recoveryDirectory Spark 保存恢复状态的目录

3.基于Zookeeper的Standby Masters
(1)ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。

image.png
配置参数 参考值
spark.deploy.recoveryMode 设置为ZOOKEEPER开启单点恢复功能,默认值:NONE
spark.deploy.zookeeper.url ZooKeeper集群的地址
spark.deploy.zookeeper.dir Spark信息在ZK中的保存目录,默认:/spark

(2)修改spark-env.sh参考:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 
-Dspark.deploy.zookeeper.dir=/spark"

(3)另外:每个节点上,需要将以下两行注释掉。

image.png

(4)同步到其他两台服务器
(5)ZooKeeper中保存的信息

image.png image.png

七.执行Spark Demo程序

1.使用Spark Shell
(1)spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。相当于REPL ,作为一个独立的Application运行
(2)两种模式:

(3)启动Spark shell:

spark-shell

参数说明:

--master spark://spark81:7077     //指定Master的地址
--executor-memory 2g      //指定每个worker可用内存为2G
--total-executor-cores 2       //指定整个集群使用的cup核数为2个

例如:

spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2

(4)在Spark shell中编写WordCount程序

sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")

参数说明:

(5)wordcount程序,处理本地文件,把结果打印到屏幕上

scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
                .flatMap(_.split(" "))
                .map((_,1))
                .reduceByKey(_+_)
                .collect
                
                res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))

(6)wordcount程序,处理HDFS文件,结果保存在hdfs上

sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
                .flatMap(_.split(" "))
                .map((_,1))
                .reduceByKey(_+_)
                .saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")

(7)单步运行wordcount --->RDD

scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
                rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> 1+1
res2: Int = 2

scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)

scala> val rdd2 = rdd1.flatMap(_.split(" "))
                rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26

scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)

scala> val rdd3 = rdd2.map((_,1))
                rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28

scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))

scala> val rdd4 = rdd3.reduceByKey(_+_)
                rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30

scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))

(8)RDD 弹性分布式数据集
(9)Scala复习:

scala>List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
                    myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)                 
myList.flatMap(x=>x.map(_*2))

flatmao执行过程:

Scala版本

image.png

(4)运行程序:

spark-submit --master spark://spark81:7077 
--class mydemo.WordCount jars/wc.jar 
hdfs://192.168.88.111:9000/data/data.txt 
hdfs://192.168.88.111:9000/output/spark/wc1

Java版本(直接输出在屏幕)

image.png

(4)运行程序:

spark-submit --master spark://spark81:7077 
--class mydemo.JavaWordCount jars/wc.jar 
hdfs://192.168.88.111:9000/data/data.txt

八.Spark运行机制及原理分析

1.WordCount执行的流程分析

image.png

2.Spark提交任务的流程析

image.png

九.RDD和RDD特性,RDD的算子

1.RDD:弹性分布式数据集
(1)什么是RDD?

(2)RDD的属性:

2.如何创建RDD
(1)通过SparkContext.parallelize方法来创建(通过sc.parallelize进行创建)

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
            
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29

scala> rdd1.partitions.length
res36: Int = 2

(2)通过外部数据源来创建

sc.textFile()           
scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2:org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29

(3)RDD的类型:TransformationAction
3.RDD的基本原理:

image.png

4.Transformation
(1)RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

5.Action

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

十.RDD特性

1.RDD的缓存机制:
(1)作用:提高性能
(2)使用:标识RDD可以被缓存 persist cache
(3)可以缓存的位置:

2.RDD的容错机制:通过检查点来实现
(1)
(1)复习检查点:HDFS中的检查点:有SecondaryNamenode来实现日志的合并。
(2)RDD的检查点:容错

(3)RDD检查点的类型:
a.基于本地目录:需要将Spark shell 或者任务运行在本地模式上(setMaster("local"))

image.png

b.HDFS目录:用于生产,集群模式

image.png
sc.setCheckPointDir(目录)
                
//举例:设置检查点
scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24

//设置检查点目录:
scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")

//标识rdd1可以执行检查点操作
scala> rdd1.checkpoint

scala> rdd1.count
res2: Long = 923452 

3.依赖关系:宽依赖,窄依赖
(1)RDD的依赖关系:

(3)宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
总结:宽依赖我们形象的比喻为超生

4.Spark任务中的Stage

image.png

十一.RDD的高级算子

1.mapPartitionsWithIndex:对RDD中的每个分区(带有下标)进行操作,下标用index表示
通过这个算子,我们可以获取分区号。

def mapPartitionsWithIndex[U](
        f: (Int, Iterator[T]) ⇒ Iterator[U], 
        preservesPartitioning: Boolean = false)(
        implicit arg0: ClassTag[U]): RDD[U]
        
//参数:f是个函数参数 f 中第一个参数是Int,代表分区号,第二个Iterator[T]代表分区中的元素

例如:

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
             | iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
             | }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitions
        mapPartitions   mapPartitionsWithIndex

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
        [partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ], [partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])

2.aggregate:聚合操作。类似于分组。
(1)先对局部进行聚合操作,再对全局进行聚合操作。

//调用聚合操作
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
        [partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])

scala> import scala.math._
import scala.math._

scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7

说明:
(2)对字符串操作

scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef

scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc

(3)复杂的例子:
a.

scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
        
scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
             | iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
             | }

scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
        [partId : 0 , value = 12 ], [partId : 0 , value = 23 ], [partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
        
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42

执行过程:

b.

rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
        
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11

执行过程:

3.其他高级算子:

十二.编程案例

1.分析日志
(1)需求:找到访问量最高的两个网页

(2)创建自定义分区

(3)使用JDBCRDD 操作数据库

(4)操作数据库:把结果存放到数据库中

Spark SQL

Spark sql基础

1.什么是Spark SQL

(2)统一的数据访问方式

(4) 支持标准的数据连接

3.为什么学习Spark SQL

(2)第二种方式 使用Spark Session
(3)直接读取一个带格式的文件。

6.操作DataFrame
(1)DSL语句
(2)SQL语句
注意:不能直接执行SQL,需要生成一个视图,再执行sql。
(3)多表查询

7.操作DataSet
(1)跟DataFrame类似,是一套新的接口。高级的Dataframe
(2)创建DataSet

(3)DataSet案例

(4)多表查询

7.Spark SQL中的视图
(1)视图是一个虚表,不存储数据。
(2)两种类型:

二.使用数据源

1.在Spark SQL中,可以使用各种各样的数据源来操作。 结构化
2.使用load函数、save函数

(1)把其他文件,转换成Parquet文件
(2)支持Schema的合并

4.json文件

5.JDBC
(1)使用JDBC操作关系型数据库,加载到Spark中进行分析和处理。
(2)方式一:
(3)方式二:

6.使用hive
(1)spark SQL 完全兼容hive
(2)需要进行配置

(3)配置好后,重启spark

(4)启动Hadoop 与 hive

三.在IDE中开发Spark SQL

四.性能优化

1.用内存中缓存表的数据
直接读取内存的值,来提高性能
2.了解性能优化的相关参数:参考讲义

Spark Streaming

一.常用的实时计算引擎(流式计算)

1.Apache Storm:真正的流式计算

2.Spark Streaming :严格上来说,不是真正的流式计算(实时计算)
把连续的流式数据,当成不连续的RDD
本质:是一个离散计算(不连续)
3.Apache Flink:真正的流式计算。与Spark Streaming相反。
把离散的数据,当成流式数据来处理
4.JStorm

二.Spark Streaming基础

1.什么是 Spark Streaming。

2.特点:

3.演示官方的Demo
往Spark Streaming中发送字符串,Spark 接收到以后,进行计数
使用消息服务器 netcat Linux自带
yum install nc.x86_64
nc -l 1234
注意:总核心数 大于等于2。一个核心用于接收数据,另一个用于处理数据
在netcat中写入数据 Spark Streaming可以取到

4.开发自己的NetWorkWordCount程序,和Spark Core类似
问题:Hello Hello
Hello World
现在现象:(Hello,2)
(Hello , 1) (World , 1)
能不能累加起来?保存记录下以前的状态?
通过Spark Streaming提供的算子来实现

三.高级特性:

1.什么是DStream?离散流

2.重点算子讲解
(1)updateStateByKey
默认情况下,Spark Streaming不记录之前的状态,每次发数据,都会从0开始
现在使用本算子,实现累加操作。
(2)transform

3.窗口操作

4.集成Spark SQL : 使用SQL语句来处理流式数据
5.缓存和持久化:和RDD一样
6.支持检查点:和RDD一样

四.数据源

Spark Streaming是一个流式计算引擎,就需要从外部数据源来接收数据
1.基本的数据源

2.高级数据源
(1)Flume
(2)Spark SQL 对接flume有多种方式:

(3)Kafka
    在讲Kafka时,举例。

四.性能优化的参数

(1)性能优化:
spark submit的时候,程序报OOM错误
程序跑的很慢
(2)方法:调整spark参数
conf.set...

性能调优

一.Spark 性能优化概览:

二.Spark性能优化,主要针对在内存的使用调优。

三.Spark性能优化的技术:

1.使用高性能序列化类库
2.优化数据结构
3.对于多次使用的RDD进行持久化、checkpoint
4.持久化级别:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
5.Java虚拟机垃圾回收调优
6.Shuffle调优,1.x版本中,90%的性能问题,都是由于Shuffle导致的。

四.其他性能优化:

1.提高并行度
2.广播共享数据
等等。。。

五.诊断Spark内存使用:首先要看到内存使用情况,才能进行针对性的优化。

1.内存花费:
(1)每个Java对象,都有一个对象头,占用16字节,包含一些对象的元信息,比如指向他的类的指针。

(2)Java的String对象,会比他内存的原始数据,多出40个字节。

(4)元素类型为原始数据类型(int),内部通常会使用原始数据类型的包装类型(Integer)来存储元素。
2.如何判断Spark程序消耗内存情况?:答案是预估
(1)设置RDD的并行度。

(2)将RDD缓存 cache()

(3)观察日志:driver日志

/usr/local/spark-2.1.0-bin-hadoop2.7/work
            19/04/13 22:01:05 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
            19/04/13 22:01:06 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)

(4)将这个内存信息相加,就是RDD内存占用量。

六.使用高性能序列化类库

1.数据序列化概述

2.kryo

bin/spark-submit will also read configuration options from conf/spark-defaults.conf, 
in which each line consists of a key and a value separated by whitespace. For example:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer   org.apache.spark.serializer.KryoSerializer

(2)使用kryo是,要求需要序列化的类,要提前注册,以获得高性能

4.kryo类库的优化
(1)优化缓存大小

spark.kryoserializer.buffer.max  //设置这个参数,将其调大。

(2)预先注册自定义类型

七.优化数据结构

1.概述

2.如何做?
(1)优先使用数组以及字符串,而不是集合类。即:优先使用Array,而不是ArrayList、LinkedList、HashMap

Map<Integer,Person> persons = new HashMap<Integer,Person>()
可以优化为:
"id:name,address"
String persons = "1:Andy,Beijing|2:Tom,Tianjin...."

(3)避免使用多层嵌套对象结构
(4)对于能够避免的场景,尽量使用int代替String

八.rdd.cache checkpoint

九.持久化级别:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化

十.Java虚拟机的调优

1.概述

2.Spark GC原理
见图片

3.监测垃圾回收

spark-submit脚本中,添加一个配置
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"

注意:这个是输出到worker日志中,而不是driver日志。
/usr/local/spark-2.1.0-bin-hadoop2.7/logs  worker日志
/usr/local/spark-2.1.0-bin-hadoop2.7/work  driver日志

4.优化Executor内存比例
(1)目的:减少GC次数。

//使用:
conf.set("spark.storage.memoryFraction",0.5)

5.Java GC 调优 (-)

十一.shuffle原理

1.优化前
图片
2.优化后
图片

十二.其他调优

1.提高并行度
2.广播共享数据

Spark Mllib:MLlib 是 Spark 可以扩展的机器学习库。

一.MLlib概述

MLlib 是 Spark 可以扩展的机器学习库。
Spark在机器学习方面具有得天独厚的有事,有以下几个原因:
1.机器学习算法一般都有多个步骤迭代计算,需要在多次迭代后,获得足够小的误差或者收敛才会停止。

    double wucha = 1.0
    while(wucha>=0.00001){
        建模  wucha -= 某个值
    }
    
    模型计算完毕
    
    当迭代使用Hadoop的MapReduce计算框架时,每次都要读写硬盘以及任务启动工作,导致很大的IO开销。
    而Spark基于内存的计算模型天生擅长迭代计算。只有在必要时,才会读写硬盘。
    所以Spark是机器学习比较理想的平台。

2.通信,Hadoop的MapReduce计算框架,通过heartbeat方式来进行通信和传递数据,执行速度慢。

二.什么是机器学习?

1.机器学习的定义。
A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E。

2.三个关键词:算法、经验、模型评价
在数据的基础上,通过算法构建出模型,并进行评价
如果达到要求,则用该模型测试其他数据
如果不达到要求,要调整算法来重新建立模型,再次进行评估
循环往复,知道获得满意的经验

3.应用:金融反欺诈、语音识别、自然语言处理、翻译、模式识别、智能控制等等

4.基于大数据的机器学习
(1)传统的机器学习算法,由于技术和单机存储的现值,只能在少量数据上使用。即,依赖于数据抽样。
(2)传统的机器学习存在的问题:很难做好随机,导致学习的模型不准确。
(3)在大数据上进行机器学习,直接处理全量数据并进行大量迭代计算。
(4)Spark本身计算优势,适合机器学习。
(5)另外 spark-shell pyspark 都可以提供及时查询工具

5.MLlib

三、线性回归

四、余弦相似性
https://blog.csdn.net/u012160689/article/details/15341303

Spark Graphx

一.Spark Graphx 是什么?

1.是Spark 的一个模块,主要用于进行以图为核心的计算,还有分布式图计算
2.Graphx 底层基于RDD计算,和RDD共用一种存储形态。在展示形态上,可以用数据集来表示,也可以用图来表示。

二.Spark GraphX 有哪些抽象?

1.顶点
RDD[(VertexId,VD)]表示
VertexId 代表了顶点的ID,是Long类型
VD 是顶点的属性,可以是任何类型
2.边
RDD[Edge[ED]]表示
Edge表示一个边
包含一个ED类型参数来设定属性
另外,边还包含了源顶点ID和目标顶点ID

3.三元组
三元组结构用RDD[EdgeTriplet[VD,ED]]表示
三元组包含一个边、边的属性、源顶点ID、源顶点属性、目标顶点ID、目标顶点属性。

4.图
Graph表示,通过顶点和边来构建。

上一篇下一篇

猜你喜欢

热点阅读