Spark学习笔记
内容来自《Spark快速大数据分析》一本很好的书
分布式和集群
集群:多台机器处理相同工作。
分布式:多台机器处理同一大工作的不同流程。
大数据核心问题
https://www.zhihu.com/question/27974418
大数据具有“4V”特性,这4V即数据量大、类型多、价值密度低、速度快时效高这样四个特点 需要处理四个核心问题。
存储,海量的数据怎样有效的存储?主要包括hdfs、Kafka;
计算,海量的数据怎样快速计算?主要包括MapReduce、Spark、Flink等;
查询,海量数据怎样快速查询?主要为Nosql和Olap,Nosql主要包括Hbase、 Cassandra 等,其中olap包括kylin、impla等,其中Nosql主要解决随机查询,Olap技术主要解决关联查询;
挖掘,海量数据怎样挖掘出隐藏的知识?也就是当前火热的机器学习和深度学习等技术,包括TensorFlow、caffe、mahout等
spark
从上层来看,每个 Spark 应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的 main 函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。在前面的例子里,实际的驱动器程序就是 Spark shell 本身,你只需要输入想要运行的操作就可以了。
驱动器程序通过一个 SparkContext 对象来访问 Spark。这个对象代表对计算集群的一个连接。shell 启动时已经自动创建了一个 SparkContext 对象,是一个叫作 sc 的变量。我们可以通过例 2-3 中的方法尝试输出 sc 来查看它的类型。
一旦有了 SparkContext,你就可以用它来创建 RDD。在例 2-1 和例 2-2 中,我们调用了sc.textFile() 来创建一个代表文件中各行文本的 RDD。我们可以在这些行上进行各种操作,比如 count() 。
spark-RDD
本章介绍 Spark 对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)。RDD 其实就是分布式的元素集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。
创建出来后,RDD 支持两种类型的操作:转化操作(transformation)和行动操作(action)。
转化操作会由一个 RDD 生成一个新的 RDD。转化操作会由一个 RDD 生成一个新的 RDD。例如,根据谓词匹配情况筛选数据就是一个常见的转化操作。在我们的文本文件示例中,我们可以用筛选来生成一个只存储包含单词 Python 的字符串的新的 RDD。
比如 map() 和 filter()
行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。 first() 就是我们之前调用的一个行动操作,它会返回 RDD 的第一个元素。【向驱动器程序返回结果或
把结果写入外部系统的操作】
比如count() 和 first()
转化操作和行动操作的区别在于 Spark 计算 RDD 的方式不同。虽然你可以在任何时候定义新的 RDD,但 Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。
最后,默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。我们可以让 Spark 把数据持久化到许多不同的地方,可用的选项会在表 3-6 中列出。在第一次对持久化的 RDD 计算之后,Spark 会把 RDD 的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。我们也可以把 RDD 缓存到磁盘上而不是内存中。
总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。
(1) 从外部数据创建出输入 RDD。
(2) 使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。
(3) 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。
(4) 使用行动操作(例如 count() 和 first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。
创建RDD
Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。
创建 RDD 最简单的方式就是把程序中一个已有的集合传给 SparkContext 的 parallelize()方法,学习时常用,可以快速生成自己的RDD,但是在开发是用得不多,因为需要把数据集放在机器内存里。
例 3-8:Python 中的 textFile() 方法
lines = sc.textFile("/path/to/README.md")
例 3-9:Scala 中的 textFile() 方法
val lines = sc.textFile("/path/to/README.md")
例 3-10:Java 中的 textFile() 方法
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
RDD操作
如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。
转化操作
许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作 RDD 中的一个元素。不过并不是所有的转化操作都是这样的。
举个例子,假定我们有一个日志文件 log.txt,内含有若干消息,希望选出其中的错误消息。我们可以使用前面说过的转化操作 filter() 。
注意, filter() 操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的 RDD。 inputRDD 在后面的程序中还可以继续使用,比如我们还可以从中搜索别的单词。事实上,要再从 inputRDD 中找出所有包含单词 warning 的行。接下来,我们使用另一个转化操作 union() 来打印出包含 error 或 warning 的行数。
union起合并的作用。
union() 与 filter() 的不同点在于它操作两个 RDD 而不是一个。转化操作可以操作任意数量的输入 RDD。
行动操作
行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。
RDD take,获取少量数据。还有一个 collect() 函数,可以用来获取整个 RDD 中的数据。
记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect() ,因此, collect() 不能用在大规模数据集上。
在大多数情况下,RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。此时,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可以使用 saveAsTextFile() 、saveAsSequenceFile() ,或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来。
中间结果可以持久化。
惰性求值
RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。
RDD使用
传递函数
Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。
转化操作
map :用于RDD中每个元素。
filter:将RDD中满足条件的元素放入新RDD中。
有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap() 。和 map() 类似,我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器(使用迭代器进行访问得到想要的结果)。
flatMap() 的一个简单用途是把输入的字符串切分为单词。
尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。
![](https://img.haomeiwen.com/i8581547/bcbb4379e1d38769.png)
行动操作
最常见的行动操作 reduce() 。它接收一个函数作为参数,这个函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就
是函数 + ,可以用它来对我们的 RDD 进行累加。
![](https://img.haomeiwen.com/i8581547/7fb51ff4718e2218.png)
键值对操作
![](https://img.haomeiwen.com/i8581547/53d88a535c33a6d4.png)
![](https://img.haomeiwen.com/i8581547/2dd49d736d6e9bc8.png)
![](https://img.haomeiwen.com/i8581547/2a0be3952fde7b5f.png)
Pair RDD 也还是 RDD(元素为 Java 或 Scala 中的 Tuple2 对象或 Python 中的元组),因此同样支持 RDD 所支持的函数。
pairRDD转化操作
聚合操作
reduceByKey() 与 reduce() 相当类似;它们都接收一个函数,并使用该函数对值进行合并。
reduceByKey() 会为数据集中的每个键进行并行的归约操作,每个归约操作会将键相同的值合并起来。因为数据集中可能有大量的键,所reduceByKey() 没有被实现为向用户程序返回一个值的行动操作。实际上,它会返回一个由各键和对应键归约出来的结果值组成的新的 RDD。
foldByKey() 则与 fold() 相当类似;它们都使用一个与 RDD 和合并函数中的数据类型相同的零值作为初始值。与 fold() 一样, foldByKey() 操作所使用的合并函数对零值与另一个元素进行合并,结果仍为该元素。
并行度调优
每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作时的并行度。本章讨论的大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的RDD 的分区数。
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定义并行度
对于这样的情况,Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作。Spark 中也有一个优化版的 repartition() ,叫作 coalesce() 。你可以使用 Java 或 Scala 中的 rdd.partitions.size() 以及 Python 中的 rdd.getNumPartitions 查看 RDD 的分区数,并确保调用 coalesce() 时将 RDD 合并到比现在的分区数更少的分区中。
分组&连接&排序
Pair RDD的行动操作
和转化操作一样,所有基础 RDD 支持的传统行动操作也都在 pair RDD 上可用。Pair RDD提供了一些额外的行动操作,可以让我们充分利用数据的键值对特性。
![](https://img.haomeiwen.com/i8581547/86be0a08fa8b27cd.png)
数据分区
本章要讨论的最后一个 Spark 特性是对数据集在节点间的分区进行控制。Spark 程序可以通过控制RDD 分区方式来减少通信开销。
举例:
举个简单的例子,我们分析这样一个应用,它在内存中保存着一张很大的用户信息表——也就是一个由 (UserID, UserInfo) 对组成的 RDD,其中 UserInfo 包含一个该用户所订阅的主题的列表。该应用会周期性地将这张表与一个小文件进行组合,这个小文件中存着过去五分钟内发生的事件——其实就是一个由 (UserID, LinkInfo) 对组成的表,存放着过去五分钟内某网站各用户的访问情况。例如,我们可能需要对用户访问其未订阅主题的页面
的情况进行统计。我们可以使用 Spark 的 join() 操作来实现这个组合操作,其中需要把UserInfo 和 LinkInfo 的有序对根据 UserID 进行分组。
// 初始化代码;从HDFS商的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性调用函数来处理过去五分钟产生的事件日志
// 假设这是一个包含(UserID, LinkInfo)对的SequenceFile
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
代码练习
collect,显示rdd变量的内容
理解:parallelize,一堆需要处理的数据集和,默认可进行分区处理,也可主动分区处理。
rdd.partitions可以看到被分到几个分区,分区的概念之后细说。
转化操作-》1.对value RDD进行转化。2.对pair RDD进行转化。-》
对一个RDD进行变换,对两个RDD进行变换。
有很多函数,可参考JavaRDD网站
去重运算-》没有改变分区数目-》但是这其中难道不应有一个合并对比操作??或者类似操作。
单元素value-map
对于数值型rdd,很像一个列表。
filter函数,以某种方法过滤出列表出需要的元素。
flatmap函数,可以把一维列表转化为二维列表。[1,2,3]->[[1,1,1],[2,2,2],[3,3,3]]
pipe 对分区中的rdd数值进行操作。
sample函数:抽样生成rdd
sortBy函数 排序并可确定升序和降序。
双元素value-map
双元素-》两个列表
可以求笛卡尔积,交集,并集,补集
可以联结两个列表 zip [1,2,3],[2,2,2]->[(1,2),(2,2),(3,2)]
单元素pair-map
用map可以轻易生成键值对 map ->(x=>(x(0),x))
keyby(_) key的名,以什么生成键
combinebykey 依据key聚合元素。(可考虑最后汇总)不去重。
同一个键的组成一个pair对。
groupbykey 同一个键的键值对组合在一起,一个集合。
key 提取key成为新rdd
mapvalues value进行变换
flatmapvalue 进行flatmap
处理嵌套json
https://blog.csdn.net/qq_21439395/article/details/80710180
http://www.lubinsu.com/index.php/archives/493
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import scala.util.parsing.json.JSON
object jsonex1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("JSONApp");
//通过conf来创建sparkcontext
val sc = new SparkContext(conf);
val inputFile = "tmp.json"//读取json文件
val jsonStr = sc.textFile(inputFile);
val result = jsonStr.map(s => JSON.parseFull(s))
val ss = SparkSession.builder()
.config(conf)
.getOrCreate()
ss.read.json("tmp.json").createOrReplaceTempView("user")
val rs = ss.sql("select user.hits from user")//返回dataframe
rs.printSchema()
rs.show()
}
}
UDF使用
可以使用自定义方法,在SQL语句和列。
写的比较好的博客:
https://www.cnblogs.com/cc11001100/p/9463909.html
https://www.jianshu.com/p/b1e9d5cc6193
Scala匿名函数
var inc =(x:Int)=>x+1
def add2 = new Function1[Int,Int]{
def apply(x:Int):Int = x+1
}
HDFS
读文件 分布式文件存储系统。
MapReduce 框架的核心步骤主要分两部分:Map 和 Reduce。当你向 MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个 Map 任务,然后分配到不同的节点上去执行,每一个 Map 任务处理输入数据中的一部分,当 Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为 Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个 Map 的输出汇总到一起并输出。
下载气象数据集部分数据,写一个 Map-Reduce 作业,求每年的最低温度。
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MinTemperature {
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: MinTemperature<input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MinTemperature.class);
job.setJobName("Min temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MinTemperatureMapper.class);
job.setReducerClass(MinTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for(IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
context.write(key, new IntWritable(minValue));
}
}