学习通过python看世界

Spark概述

2021-03-16  本文已影响0人  奋斗的蛐蛐

前言

内容:

Spark Core -- 离线处理
Spark SQL -- 离线处理、交互
Spark Streaming -- 实时
Spark Graphx -- 图处理
Spark原理

MapReduce、Spark、Flink(实时)=> 3代计算引擎,昨天、今天、未来

MapReduce、Spark:类MR引擎,底层原理非常相似,数据分区、MapTask、ReduceTask、Shuffle

Spark Core

Spark概述

什么是Spark

官网

Spark是一个快速、通用的计算引擎,特点:

Spark与Hadoop

从狭义的角度上看:Hadoop是分布式框架。有存储、计算、资源调度三个部分组成

Spark是一个分布式计算引擎,由Scala语言编写的计算框架,基于内存的快速、通用、可扩展的大数据分析引擎

从广义上来看,Spark是Hadoop生态不可或缺的一部分

MapReduce的不足:

Spark与Hadoop2.png

Spark在借鉴的MR的优点的同时,很好的解决了MR面临的问题

MR Spark
数据存储结构:磁盘HDFS文件系统的Split 使用内存构建弹性分布式数据集RDD对数据进行运算和cache
编程范式:Map+Reduce仅提供两个操作,表达力欠缺 提供了丰富的操作,是数据处理逻辑的代码非常简短
计算中间结果需要落到磁盘,IO及序列化、反序列化代价大 计算中间结果在内存中,维护存取速度比磁盘高几个数量级
Task以进程的方式维护,需要数秒时间才能启动 Task以线程的方式维护,对于小数据集读取能够达到亚秒级的延迟

备注:Spark计算模式也属于MR,Spark是对MR框架的优化

在实际应用中,大数据应用有以下三种类型

当同时存在以上三种情况的时候,Hadoop框架需要同时部署多种不同的软件:MR/Hive或Impala/Storm。这样做难免会带来一些问题:

Spark所提供的生态系统足以解决上述三种场景,即同时支持批处理、交互式查询和流数据处理

Spark为什么比MR快

  1. Spark积极使用内存。MR框架中的一个Job只能拥有一个MapTask任务和一个ReduceTask任务,如果业务复杂,一个Job就表达不出来,需要多个Job组合起来,然后前一个Job需要写到HDFS中,才能交给后面的Job,在MR中,这样的运算涉及到多次写入和读取操作,Spark框架则可以把多个MapTask和ReduceTask组合在一起连续执行,中间的计算结果不需要落地

    复杂的MR任务:mr + mr + mr+mr ....

    复杂的Spark任务: mr -> mr -> mr ....

  2. 多进程模型(MR)vs多线程模型(Spark),MR框架中的MapTask和ReduceTask是进程级别的,都是JVM进程,每次启动都需要重新申请资源消耗了不必要的时间,而SparkTask是基于线程模型,通过复用线程池的线程来减少启动、关闭Task所需要的系统开销

系统架构

Spark 运行架构包括:

Spark集群部署模式

Spark支持3种不是模式:Standalone、Yarn、Mesos

Standalone模式
Spark on Yarn 模式
Spark in Mesos模式
总结

粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Driver和多个Executor组成,每个Executor占用若干资源,内部可运行多个Task。程序正式运行之前,需要将所有的资源申请好,且运行的工程中一直占有,即使不使用,也是等到任务结束在回收资源

细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,核心思想就是按需分配。

三种集群部署模式如何选择:

相关术语

官方文档

Spark安装配置

Spark安装

官网地址

文档地址

下载地址

下载Spark安装包

安装步骤

1、下载后解压缩

cd /opt/lagou/software/
tar zxvf spark-2.4.5-bin-without-hadoop-scala-2.12.tgz
mv spark-2.4.5-bin-without-hadoop-scala-2.12/ ../servers/spark- 2.4.5/

2、设置环境变量,并使其生效

vi /etc/profile
export SPARK_HOME=/opt/lagou/servers/spark-2.4.5
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/profile

3、修改配置

文件位置:$SPARK_HOME/conf,赋值需要的文件

cp slaves.template slaves
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.sh
cp log4j.properties.template log4j.properties

修改slaves

linux121
linux122
linux123

vim spark-defaults.conf

#Spark的master节点的地址和端口号
spark.master                     spark://linux121:7077
#是否做日志的聚合
spark.eventLog.enabled           true
#聚合日志在HDFS的目录,如果放在HDFS需要启动HDFS,并提前创建好目录spark-eventlog
spark.eventLog.dir               hdfs://linux121:9000/spark-eventlog
# 序列化方式
spark.serializer                 org.apache.spark.serializer.KryoSerializer
# Driver内存,默认是1G
spark.driver.memory              512m

修改spark-env.sh

export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_231
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.9.2
export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop
#这里使用的是 spark-2.4.5-bin-without-hadoop,所以要将 Hadoop 相关 jars 的位置告诉Spark
export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.9.2/bin/hadoop classpath)
export SPARK_MASTER_HOST=linux121
export SPARK_MASTER_PORT=7077

4、将Spark分发到其他两个机器,并修改环境变量,让环境变量生效

rsync-script spark-2.4.5

5、启动集群

cd $SPARK_HOME/sbin
./start-all.sh

分别在linux121、linux122、linux123上执行 jps,可以发现:

linux121:Master、Worker

linux122:Worker

linux123:Worker

此时 Spark 运行在 Standalone 模式下。

在浏览器中输入:http://linux121:8080/

Spark搭建.png

备注:在$HADOOP_HOME/sbin$SPARK_HOME/sbin 下都有 start-all.sh 和 stop-all.sh 文件

whereis start-all.sh.png

此时直接执行start-all或stop-all的时候,回直接先执行hadoop的start-all,有冲突

解决方案:

6、集群测试

# 计算派:3.14……
run-example SparkPi 10 
# 启动shell脚本
spark-shell

#读取HDFS 文件,并进行单词统计
val lines = sc.textFile("/azkaban-wc/wc.txt")
lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)

Spark不是一定要依赖HDFS的,只有用到了HDFS,才需要HDFS

Apache Spark支持多种部署模式。最简单的就是单机本地模式(Spark所有进程都运 行在一台机器的JVM中)、伪分布式模式(在一台机器中模拟集群运行,相关的进程 在同一台机器上)。分布式模式包括:Standalone、Yarn、Mesos。

Apache Spark支持多种部署模式:

本地模式

部署在单机,主要用于测试或实验,最简单的运行模式,所有的进程都运行在一个JVM中。本地模式用单机的多个线程来模拟Spark分布式计算,通常用来验证开发出来的应用程序逻辑上又饿密友问题,这种模式非常简单,只需要包Spark安装包解压,修改一些常用配置即可,不用启动Master、Worker守护进程,也不用启动HDFS,除非用到。

前面几种模式没有指定M,默认是为1

测试:

1、启动 Spark 本地运行模式,此时JPS后只有一个SparkSubmit进程

spark-shell --master local

2、 测试

#读取本地文件,懒加载,如果文件不存在,此时不会报错,等到真正使用的时候才会报错
val lines = sc.textFile("file:///root/mess.txt")
# 计算行数
lines.count

备注:如果不需要使用HDFS,可以将 spark-defaults.conf相关的日志聚合部分注释

伪分布式模式

伪分布式模式:在一台机器中模拟集群运行,相关的进程在同一台机器上;

备注:不用启动集群资源管理服务;

备注:参数之间没有空格,memory不能加单位

1、启动 Spark 伪分布式模式

spark-shell --master local-cluster[4,2,1024]

2、使用 jps 检查,发现1个 SparkSubmit 进程和4个 CoarseGrainedExecutorBackend 进程SparkSubmit依然充当全能角色,又是Client进程,又是Driver程序,还有资源管理 的作用。4个CoarseGrainedExecutorBackend,用来并发执行程序的进程。

3、执行简单的测试程序

park-submit --master local-cluster[4,2,1024] --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.5.jar 10

备注:

集群模式-Standalone模式

参考文档

Standalone配置

备注:./sbin/start-slave.sh [options];启动节点上的worker进程,调试中较为常用

在 spark-env.sh 中定义:


Standalone配置.png

SPARK_WORKER_CORES:Total number of cores to allow Spark applications to use on the machine (default: all available cores).

SPARK_WORKER_MEMORY:Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m , 2g (default: total memory minus 1 GiB); note that each application's individual memory is configured using its spark.executor.memory property.

可以在spark-eh.sh中修改改参数

export SPARK_WORKER_CORES=10
export SPARK_WORKER_MEMORY=20g
运行模式

最大的区别:Driver运行在哪里。

测试1(Client模式)

spark-submit --class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 5000

再次使用jps检查集群中的进程:

测试2(Cluster模式)

spark-submit --class org.apache.spark.examples.SparkPi \
--deploy-mode cluster \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 5000

在启动 DriverWrapper 的节点上,进入 $SPARK_HOME/work/,可以看见类似 driver-20200810233021-0000 的目录,这个就是 driver 运行时的日志文件,进入 该目录,会发现:

History Server

修改spark-defaults.conf

spark.eventLog.compress     true

修改 spark-env.sh

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://linux121:9000/spark-eventlog"

spark.history.retainedApplications。设置缓存Cache中保存的应用程序历史记录的 个数(默认50),如果超过这个值,旧的将被删除;缓存文件数不表示实际显示的文件总数,只是表示不再缓存中的文件可能需要从硬盘中读取,速度有差别

前提条件:启动hdfs(日志需要写入到HDFS)

启动historyserver,使用 jps 检查,可以看见 HistoryServer 进程。如果看见该进程,请检查对应的日志。

$SPARK_HOME/sbin/start-history-server.sh

web端地址:http://hhb:18080/

高可用配置

Spark Standalone集群是 Master-Slaves架构的集群模式,和大部分的Master- Slaves结构集群一样,存着Master单点故障的问题。如何解决这个问题,Spark提供 了两种方案:

1、基于ZK的Standby Master,适用于生产模式。将 Spark 集群连接到 Zookeeper,利用 Zookeeper 提供的选举和状态保存的功能,一个 Master 处于 Active 状态,其他 Master 处于Standby状态;保证在ZK中的元数据主要是集群的信息,包括:Worker,Driver和Application以及 Executors的信息;如果Active的Master挂掉了,通过选举产生新的 Active 的 Master,然后执行状态恢 复,整个恢复过程可能需要1~2分钟;

2、基于文件系统的单点恢复(Single-Node Rcovery with Local File System), 主要用于开发或者测试环境。将 Spark Application 和 Worker 的注册信息保存在文 件中,一旦Master发生故障,就可以重新启动Master进程,将系统恢复到之前的状态

基于ZK的Standby Master.png

配置步骤

1、启动ZK

2、修改 spark-env.sh 文件,并分发到集群中

# 注释以下两行!!!
# export SPARK_MASTER_HOST=linux120
# export SPARK_MASTER_PORT=7077

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=linux121,linux122,linux123 -Dspark.deploy.zookeeper.dir=/spark"

备注:

3、启动Spark集群(Linux120)

$SPARK_HOME/sbin/start-all.sh

浏览器输入:http://hhb:8080/,刚开始 Master 的状态是STANDBY,稍等一会 变为:RECOVERING,最终是:ALIVE

4、在 linux121上启动master

$SPARK_HOME/sbin/start-master.sh

进入浏览器输入:http://linux122:8080/,此时 Master 的状态为:STANDBY

5、杀到linux120上 Master 进程,再观察 linux121 上 Master 状态,由 STANDBY => RECOVERING => ALIVE

小结:

集群模式-Yarn模式

参考官方文档

需要启动的服务:hdfs服务、yarn服务,需要关闭的Standalone对应的服务(即集群中Master、Worker进程)一山不容二虎

在Yarn模式中,Spark应用程序有两种运行模式

二者的主要区别:Driver在哪里

1、关闭 Standalon 模式下对应的服务;开启 hdfs、yarn、historyserver 服务

2、修改 yarn-site.xml 配置。在 $HADOOP_HOME/etc/hadoop/yarn-site.xml 中增加,分发到集群,重启 yarn服务

<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

备注:

3、修改配置,分发到集群

# spark-default.conf(以下是优化)
# 与 hadoop historyserver集成 
spark.yarn.historyServer.address linux120:18080
# 添加(以下是优化)
spark.yarn.jars hdfs:///spark-yarn/jars/*.jar

上传jar包

# 将 $SPARK_HOME/jars 下的jar包上传到hdfs 
hdfs dfs -mkdir -p /spark-yarn/jars/ 
cd $SPARK_HOME/jars
hdfs dfs -put * /spark-yarn/jars/

4、测试

# client
spark-submit --master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 2000

在提取App节点上可以看见:SparkSubmit、CoarseGrainedExecutorBackend

在集群的其他节点上可以看见:CoarseGrainedExecutorBackend

在提取App节点上可以看见:程序计算的结果(即可以看见计算返回的结果)

# cluster
spark-submit --master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 2000

在提取App节点上可以看见:SparkSubmit

在集群的其他节点上可以见:CoarseGrainedExecutorBackend、ApplicationMaster(Driver运行在此)

在提取App节点上看不见最终的结果

整合HistoryServer服务

前提:Hadoop的 HDFS、Yarn、HistoryServer 正常;Spark historyserver服务正常;

Hadoop:JobHistoryServer

Spark:HistoryServer

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 20
整合HistoryServer服务1.png 整合HistoryServer服务2.png

1、修改 spark-defaults.conf,并分发到集群

spark.yarn.historyServer.address linux121:18080
spark.history.ui.port 18080

2、重启/启动 spark 历史服务

stop-history-server.sh
start-history-server.sh

3、查看日志

打开日志http://linux123:8088/cluster

备注:

1、在课程学习的过程中,大多数情况使用Standalone模式 或者 local模式
2、建议不使用HA;更多关注的Spark开发

开发环境搭建IDEA

前提:安装scala插件;能读写HDFS文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>com.hhb.bigdata</artifactId>
        <groupId>com.hhb.bigdata</groupId>
        <version>1.0</version>
        <relativePath>../bigdata/pom.xml</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark</artifactId>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.9.2</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- hive 文件-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.3.7</version>
        </dependency>

    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码:

package com.hhb.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-10-13 22:21
 **/
object WordCount {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(conf)
    //    val lines: RDD[String] = sc.textFile("hdfs://linux121:9000/azkaban-wc/wc.txt")
    //    val lines: RDD[String] = sc.textFile("/wcinput/wc.txt")
    val lines: RDD[String] = sc.textFile("data/wc.dat")
    lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
    sc.stop()
  }
}

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
    </property>
</configuration>
上一篇 下一篇

猜你喜欢

热点阅读