Spark学习(二)——RDD和WordCount程序

2020-06-14  本文已影响0人  大数据阶梯之路

Spark SQL主要用作离线海量数据分析
Spark Streaming主要用作对数据实时处理
Spark的工作原理与MapReduce是如出一辙的,区别在于MapReduce是在HDFS上做计算,而Spark是在内存中做计算,这就形成了Spark这一计算引擎的优势——效率速度快。

一、RDD简介

RDD,(Resilient Distributed Datasets),即 弹性分布式数据集,是一个容错、并行的数据结构,也就是分布式的元素集合,在代码中RDD是一个抽象类,是不可变、可分区、里面的元素可并行计算的数据集合。每个RDD被分为多个分区,每个分区运行在集群的不同节点中。通过RDD的依赖关系形成Spark的调度顺序,其实所谓的Spark程序,就是一组RDD的操作,相当于在内存中跑MapReduce。
在Spark中对数据的所有操作不外乎创建RDD、转化已有RDD或者调用RDD操作进行计算,Spark会自动将RDD中的数据分发到集群上并将操作并行化执行。

二、RDD操作

RDD操作也称为算子,分为转换算子行动算子。算子就是一系列的方法操作,把问题转换到可解决的状态,这就称为spark中的算子。只要是转换算子,必然就会产生新的RDD,转换算子是封装这计算逻辑,只有用到了行动算子的时候才会去读取数据。即:Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

三、RDD闭包检测

就是算子以外的代码都是在Driver端执行,而算子以内的代码都是在Executor端执行的,算子内的代码用到算子外的代码就会形成了闭包效果,如果算子外的数据无法序列化,就无法传值给Executor端执行从而报错,所以runJob前会先检测闭包内的对象是否可进行序列化,这就称为闭包检测。

四、RDD依赖

从一个RDD转换产生一个新的RDD,这相邻的2个RDD之间就会建立依赖关系,这个依赖关系是会保留下来的,因为Spark的RDD不会保留中间数据,所以为了容错性通过这种将依赖关系保留下来的操作来避免执行RDD算子失败后的重新读取计算。相邻2个RDD的直接关系则为依赖,间接产生依赖的RDD则称为血缘
宽依赖:每个父RDD的分区可以被多个子RDD的分区使用称为宽依赖
窄依赖:每个父RDD的分区最多只能被一个子RDD的一个分区使用称为窄依赖

rdd.toDebugString      //查看血缘关系

五、RDD任务划分

RDD任务切分为:Application、Job、Stage、Task
Application:初始化一个Spark上下文环境对象SparkContext即生成一个Application
Job:一个行动算子就会生成一个Job
Stage:Stage个数等于宽依赖(ShuffleDependency)个数+1
Task:一个Stage阶段,最后一个RDD的分区个数就是Task的个数
即 1 Application -> n Job -> n Stage -> n Task

六、RDD持久化

RDD是不存储数据的,而当一个RDD要重复使用时,那么则需要从头开始获取数据,也就相当于RDD对象是可以重用,而RDD数据则不可重用。所以可以想办法把一个RDD的数据给到另一个RDD前先把数据放到缓存中,然后后面的多个RDD再从缓存取即可实现持久化。 image.png
rdd.cache()     //cache默认持久化操作是把数据持久化到内存中,如果保存到磁盘则使用persist方法选择存储级别
rdd.persist(存储级别)    //其实cache方法底层也是调用的persist方法

持久化操作是在行动算子执行时完成的,别忘记了,只有当行动算子执行了才有数据。这种持久化场景不一定说是要用在对RDD的重用上,当然还可以用在一些比较耗时的和数据重要的场景,先把这些RDD数据给持久化下来,方便后续使用和数据安全。

sc.setCheckpointDir("hdfs路径")   //创建checkpoint保存路径
rdd.checkpoint()   //落盘数据到文件,当作业执行完毕后,不会被删除

cache()、persist()、checkpoint区别
cache:是将数据临时保存在内存中进行重用,但这样数据可能不安全,比如内存溢出和丢失
persist:是将数据临时保存到磁盘中进行重用,数据安全,但涉及磁盘IO,效率较低
checkpoint:是将数据长久落盘到文件中进行重用,数据安全,但会独立执行作业,则效率更低,checkpoint等同于改变了数据源了,会切断血缘关系
为了提高checkpoint效率,可以搭配cache一起使用

rdd.cache()
rdd.checkpoint()

附、WordCount程序例子实战

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.meizu.xiaojiang.WordCount</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
image.png 经上面程序编写,且在项目里的input文件夹里新建好test.txt文件, 之后程序运行是会出现一个报错的:java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.,但这是不影响spark程序运行,因为本地是没有安装hadoop或者spark的部署环境,所以程序运行才会报错。可参考这篇文章解决:https://www.cnblogs.com/mdlcw/p/11106218.html 程序结果.png

不过一般做法是window系统只负责写程序,然后程序是打包到linux集群上去运行的,集群上面才有环境,而且毕竟大数据的存储也是在集群中,如果需要调试程序的话也直接去集群上调试即可。

YARN_CONF_DIR=/usr/hadoop-2.6.4/etc/hadoop

之后把上面的统计单词数的程序修改下,把读取文件那一行的路径修改为

//读取HDFS中根目录下的test.txt文件,将文件一行一行读取出来
val lines: RDD[String] = sc.textFile("hdfs://master:9000/test.txt")

把WordCount程序打包成jar文件,然后上传到spark安装目录下,再把test.txt文本文件存放到HDFS中的根目录下。
最后切换到spark安装目录下,然后执行spark-sumbit命令,如下就完事了。
\代表的是换行,--代表的是附带参数,--name指的是程序名,--class指的是类名,用全称。

./bin/spark-submit \
--master yarn-client \
--name WordCount \
--class com.meizu.xiaojiang.WordCount \
--executor-memory 1G \
--total-executor-cores 2 \
/usr/spark-2.4.3/WordCount.jar
image.png image.png
上一篇下一篇

猜你喜欢

热点阅读