Spark慢慢来Spark

《spark快速大数据分析》学习笔记

2017-06-18  本文已影响1997人  485b1aca799e

学习笔记


spark安装目录:/usr/lib/spark/

采集系统上spark安装版本为1.1.0,hadoop安装版本为2.5.0,系统上目前spark采用的集群模式为standalone模式。

spark配置文件:/usr/lib/spark/conf/spark-env.sh,该文件中包含了指定master节点,UI界面端口。

读取文件端口:
x=sc.textFile("hdfs://192.168.10.17:8020/cucrz/data/look/1901/2016/12/*/"),为8020端口。


启动spark-shell或者pyspark或打包运行时参数:
--master spark://hadoop1:7077     ##设置集群master节点,不加则使用单机版
--executor-memory 10g
##设置集群每个节点运行的内存为10g,最大为14.6g

第一章 spark简介 (略)

第二章 spark下载与入门

使用命令 pyspark 进入python 版本的spark shell
使用命令 spark-shell 进入scala 版本的spark shell
val lines =sc.textFile("README.md")
lines.count()
lines.first()

val lines =sc.textFile("README.md")
val x = lines.filter(line => line.contains("Python"))
x.first()

2.3 spark核心概念简介

2.4 独立应用

在scala中初始化spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val conf =new SparkConf().setMaster("local").setAppName("My App")
val sc =new SparkContext(conf)
//上述代码展示了创建SparkContext的最基本的方法,你只要传递两个参数
//集群URL:告诉Spark如何连接到集群上。上述例子中使用的是local.
//应用名:上诉代码中使用的是My App。
//最后,关闭spark可以使用SparkContext中的stop()方法,或者直接退出应用,System.exit(0)或者sys.exit().
单词统计应用
val conf =new SparkConf().setAppName("wordcount")
val sc = new SparkContext(conf)
val input = sc.textFile(inputFile)//inputFile为数据的输入路径和文件名
val words = input.flatMap(line => line.split(" "))//flatMap方法对所有输入的元素进行运算,然后将得到列表结果进行连接
val counts=words.map(word => (word,1)).reduceByKey{case (x,y) => x+y}
counts.saveAsFile(outputFile)
//109节点默认输出的路径在/home/hdfsnfs/user/root/路径中
//或者在hadoop hdfs分布式系统中,使用命令hadoop fs -ls /user/root,在该目录路径下

第三章 RDD编程

3.1 RDD基础

// 例子
x.persist()
x.count()
x.first()
//把RDD持久化到内存中

总的来说,每个spark程序或shell会话都是按照以下的方式工作:

  1. 从外部数据创建出输入RDD。
  2. 使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD。
  3. 告诉spark对需要被重用的中间结果RDD执行persist()操作。
  4. 使用行动操作来出发一次并行计算,spark会对计算进行优化后再执行。

3.2 创建RDD

  1. 一种是把程序中一个已有的集合传给SparkContext的parallelize()方法 val lines =sc.parallelize(List("pandas","i like pandas"))
  2. 另外一种更常用的方法是从外部存储中读取数据来创建RDD。 val lines = sc.textFile("/path/to/README.md")

3.3 RDD操作

  1. RDD的转化操作是返回一个新的RDD操作
  2. 行动操作是向驱动程序返回结果或把结果写入外部系统的操作,会触发实际的计算。

3.3.1 转化操作

val inputRDD =sc.textFile("log.txt")
val errorRDD =inputRDD(line => line.contains("error"))
//filter()操作不会改变已有的inputRDD中的数据。会返回一个新的RDD。
val warningRDD =inputRDD(line => line.contains("warning"))
val badlineRDD =errorRDD.union(warningRDD)

3.3.2 行动操作

println("Input had"+badLineRDD.count()+"concerning lines")
println("Here are 10 examples")
badLinesRDD.take(10).foreach(println)//打印前10条数据

3.3.3 惰性求值

3.4 向spark传递函数

class SearchFunction(val query:String){
def isMatch(s:String):Boolean={
    s.contains(query)
    }
def getMatchesFunctionReference(rdd:RDD[String]):RDD[String]={
    rdd.map(isMatch)
    }
def getMatchesFieldReference(rdd:RDD[String]):RDD[String]={
    rdd.map(x => x.split(query))
    }
def getMatchesNoReference(rdd:RDD[String]):RDD[String]={
    val query_=this.query
    rdd.map(x => x.split(query_))
    }
}

//如果在scala中出现了NotSerializableExecption,通常问题就在于我们传递了一个不可序列化的类中的函数或字段。
//记住,传递局部可序列化的变量或顶级对象中的函数始终是安全的。

3.5 常见的转化和行动操作

3.5.1 基本RDD

1.针对各个元素的转化操作
  1. map函数
  1. filter函数
val input = sc.parallelize(List(1,2,3,4))
val result=input.map(x => x*x)
println(result.collect().mkString(","))//mkString函数将元素连接在一起
  1. flatMap函数
val lines=sc.parallelize(List("hello world","hi"))
val words=lines.flatMap(line => line.split(" "))
words.first()
2.伪集合操作

表3.2 对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

函数名 目的 示例 结果
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD rdd.map(x => x+1) {2,3,4,4}
flatMap() 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来分切单词,执行扁平化操作。 rdd.flatMap(x => x.to(3)) {1,2,3,2,3,3,3}
filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x => x!=1) {2,3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 对RDD采样,以及是否替换,抽取比例等等 rdd.sample(false,0.5) {1,2,3} 非确定的
函数名 目的 示例 结果
union() 生成一个包含两个RDD中所有元素的RDD (不去重的并集) rdd.union(other) {1,2,3,3,4,5}
intersection() 求两个RDD共同的元素的RDD (交集) rdd.intersection(other) {3}
subtract() 求移除一个RDD中的内容 (差集) rdd.subtract(other) {1,2}
cartesian() 笛卡尔积 rdd.cartesian(other) {(1,3),(1,4),...,(3,5)}

表3.3:对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作

函数名 目的 示例 结果
union() 生成一个包含两个RDD中所有元素的RDD (不去重的并集) rdd.union(other) {1,2,3,3,4,5}
intersection() 求两个RDD共同的元素的RDD (交集) rdd.intersection(other) {3}
subtract() 求移除一个RDD中的内容 (差集) rdd.subtract(other) {1,2}
cartesian() 笛卡尔积 rdd.cartesian(other) {(1,3),(1,4),...,(3,5)}
3.行动操作
val sum = rdd.reduce((x,y) => x+y)
val result =input.aggregate((0,0))(
(acc,value) => (acc._1+value,acc._2+1),
(acc1,acc2) => (acc1._1 + acc2._1 ,acc1._2 +acc2._2)
)
val avg= result._1 /result._2.toDouble
//val input =sc.parallelize(List(1,2,3,4,5,6)),输入input值变量为一个列表类型,对其求平均
函数名 目的 示例 结果
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,3}
count() RDD中元素的个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}
top(num) 从RDD中返回最前面的num个元素 rdd.top(2) {3,3}
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSampe(withReplacement,num,[seed]) 从RDD中返回任意一些元素 rdd.takeSample(false,1) 非确定的
reduce(func) 并行整合RDD中所有数据 rdd.reduce((x,y) => x+y) 9
fold(zero)(func) 和reduce类似,但是需要提供初始值 rdd.fold(0)((x,y) => x+y) 9
aggregate(zeroValue)(seqOp,comOp) 和reduce相似,但是通常返回不同类型的函数 rdd.aggregate((0,0)) ((x,y)=> (x._1+x,x._2+1) ,(x,y) => (x._1+y._1,x._2+y._2)) (9,4)
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func) 无 //和map进行对比,map也是对RDD中的每个元素进行操作,但是允许有返回值

3.5.2 在不同RDD类型间转换

3.6 持久化(缓存)

org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级别:如果有必要,可以通过在存储级别的末尾加上_2来把持久化数据存为两份

级别 使用的空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分 如果数据在内存中放不下,则溢写到磁盘上。在内存中放序列化后的数据
DISK_ONLY
val result =input.map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

第四章 键值对操作

4.2 创建pair RDD

//在scala中使用第一个单词为键创建出一个pair RDD 
val pairs =lines.map(x => (x.split(" ")(0),x))

4.3 Pair RDD的转化操作

表4-1,pair RDD的转化操作 (以键值对集合 {(1,2),(3,4),(3,6)}为例)

函数名 目的 示例 结果
reduceByKey(func) 合并具有相同键的值,使用函数func rdd.reduceByKey((x,y) => x+y) {(1,2),(3,10)}
groupByKey() 对具有相同键的值进行分组 rdd.groupByKey() {(1,[2]),(3,[4,6])}
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 使用不同的返回类型合并具有相同键的值
mapValues(func) 对pair RDD中的每个值应用一个函数而不改变键 rdd.mapValues(x => x+1) {(1,3),(3,5),(3,7)}
flatMapValues(func) 对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化 rdd.flatMapValues(x => (x to 5)) {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
keys 返回一个仅包含键的RDD rdd.keys {1,3,3}
values 返回一个仅包含值的RDD rdd.values {2,4,6}
sortByKey() 返回一个根据键排序的RDD rdd.sortByKey() {(1,2),(3,4),(3,6)}

表4-2,针对两个pair RDD 的转化操作 (rdd={(1,2),(3,4),(3,6)},other={(3,9)})

函数名 目的 示例 结果
subtractByKey 删掉RDD中键与otherRDD中的键相同的元素(差集) rdd.subtractByKey(other) {(1,2)}
join 对两个rdd进行内连接 rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 对两个RDD进行连接操作,以右边的为主,右外连接 rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}
leftOuterJoin 对两个RDD进行左外连接,以左边的为主 rdd.leftOuterJoin(other) {(1,(2,None)),(3,(4,Some(9)),(3,(6,Some(9))}
cogroup 将两个RDD中拥有相同键的数据分组到一起 rdd.cogroup(other) {(1,([2],[])),(3,([4,6])),([4,6],[9])}
val input=sc.textFile("README.md")
val pairs=input.map(x => (x.split(" ")(0),x))
pairs.filter(value => value._2.length<20).collect().foreach(println)

4.3.1 聚合操作

在scala中使用reduceByKey和mapByKey计算每个键对应的平均值

rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2+y._2)) 
key value key value key value
panda 0 panda (0,1) panda (1,2)
pink 3 pink (3,1) pink (7,2)
pirate 3 pirate (3,1) pirate (3,1)
panda 1 panda (1,1)
pink 4 pink (4,1)

在scala中实现单词计数

val input=sc.textFile("README.md")
val words=input.flatMap(x => x.split(" "))
val result=words.map(x => (x,1)).reduceByKey((x,y) => x+y)

combineByKey函数

//scala中使用combineByKey求每个键对应的平均值
val result =input.combineByKey(
(v) => (v,1),
(acc:(Int,Int),v) => (acc._1+v,acc._2+1)
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1+acc2._1,acc1._2+acc2._2)
).map{case (key,value) => (key,value._1/value._2.toFloat)}
result.collectAsMap().map(println(_))
//最后两个函数可以考虑更换代码
//result.mapValues(x => x._1/x._2.toFloat).collect().foreach(println)
combineByKey函数,首先将键值扩展成元组形式,然后对每个分区进行元组统计,然后再对所有分区进行叠加
最后,如果使用map函数,则对RDD中所有元素(包括键)进行元素,这里涉及到一个判断;也可以使用mapValues函数直接对键值进行操作。

并行度调优

val data =sc.parallelize(Seq(("a",3),("b",4),("a",1)))
data.reduceByKey(_+_)
data.reduceByKey(_+_,10)

4.3.3 连接

4.3.2 数据排序

input.sortByKey(ascending=false).collect().foreach(println)

4.4 pair RDD的行动操作

函数 描述 示例 结果
countByKey() 对每个键对应的元素分别计数 rdd.countByKey() {(1,1),(3,2)}
collectAsMap() 将结果以映射的形式返回,以便查询 rdd.collectAsMap() Map{(1,2),(3,4),(3,6)}
lookup(key) 返回给定键对应的所有值 rdd.lookup(3) [4,6]

第五章 数据读取与保存

  1. 文件格式与文件系统(HDFS、NFS、Amazon S3)
  2. Spark SQL中的结构话 数据源
  3. 数据库与键值存储

5.2 文件格式

5.2.1 文本文件

  1. 读取文本文件
val input =sc.textFile("file:///home/shanjiajun/sparkfile/README.md")
//或 val input=sc.textFile("/home/shanjiajun/sparkfile/README.md")
  1. 保存文本文件
result.saveAsTextFile(outputFile)

5.2.3 逗号分隔值与制表分隔值

//在scala中使用textFile()读取CSV
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input =sc.textFile(inputFile)
val result=input.map{line => val reader=new CSVReader(new StringReader(line));
reader.readNext();
}

5.3.1 本地文件系统

val input =sc.textFile("file:///root/README.md")

5.3.3 HDFS中读取数据

5.4 Spark SQL 中的结构化数据读取

5.4.1 Apache Hive

import org.apache.spark.sql.hive.HiveContext

val hiveCtx =new org.apache.spark.sql.hive.HiveContext(sc)
val rows=hiveCtx.sql("select name,age from users")
val firstRow=rows.first()
println(firstRow.getString(0))

第六章 spark编程进阶

6.2 累加器

-累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

val sc=new SparkContext(...)
val file=sc.textFile("file.txt")
val blankLines=sc.accumulator(0)
val callSigns =file.flatMap(line => {
if(line == ""){
blankLines +=1  //累加器加1
}
line.split(" ")
})

callSing.saveAsTextFile("output.txt")
println("Blank lines:"+blankLines.value)
//x.saveAsTextFile("file:///home/shanjiajun/sparkfile/datax")

6.6 数值RDD的操作

方法 含义
count() RDD中的元素个数
mean() 元素的平均值
sum() 总和
max() 最大值
min() 最小值
variance() 元素的方差
sampleVariance() 从采样中计算出的方差
stdev() 标准差
smapleStdev() 采样的标准差

表6-2 StatsCounter中可用的汇总统计数据

方法 含义
count() RDD中的元素个数
mean() 元素的平均值
sum() 总和
max() 最大值
min() 最小值
variance() 元素的方差
sampleVariance() 从采样中计算出的方差
stdev() 标准差
smapleStdev() 采样的标准差
//用scala移除异常值
val distanceDouble=distance.map(string => string.toDouble)
val stats =distinceDoubles.stats()
val stddev=stats.stdev
val mean=stats.mean
val reasonableDistances =distanceDoubles.filter(x => math.abs(x-mean) <3 * stddev)
println(reasonableDistance.collect().toList)

第七章 在集群上运行spark

7.2 Spark运行时构架

7.2.1 驱动器节点

7.2.2 执行器节点

7.2.5 小结

  1. 用户通过spark-submit脚本提交应用。
  2. spark-submit脚本启动驱动器程序,调用用户定义的main()方法。
  3. 驱动器进程执行用户 应用中的操作。根据程序中所定义的对RDD的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程。
  4. 任务在执行器程序中进行计算并保存结果。
  5. 如果驱动器程序的main()方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。

7.3 使用spark-submit部署应用

初始化sparkSQL

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext


val sc=new SparkContext()
val hiveCtx=new SQLContext(sc)
上一篇 下一篇

猜你喜欢

热点阅读