Spark基础概述

2019-08-06  本文已影响0人  _羊羽_

概述

官方网站 http://spark.apache.org/
官方文档地址:http://spark.apache.org/docs/latest/

图片.png

下载

安装Java环境
openjdk 官方教程地址 http://openjdk.java.net/install/

# yum search openjdk-dev
image.png

http://spark.apache.org/downloads.html

image.png

配置环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export HADOOP_HOME=$HOME/app/hadoop
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
export SPARK_HOME=$HOME/app/spark
export PATH=$PATH:$SPARK_HOME/bin

目录结构

目录 说明
bin 可执行脚本,Spark相关命令
conf spark配置文件
data spark自带例子用到的数据
examples spark自带样例程序
jars spark相关jar包
sbin 集群启停,因为spark有自带的集群环境

Spark软件包bin目录说明:

命令 说明
spark-shell spark shell模式启动命令
spark-submit spark应用程序提交脚本
run-example 运行spark提供的样例程序
spark-sql spark SQL命令启动命令

启动

/root/app/spark/sbin

./start-all.sh

提交任务

spark-submit --class org.apache.spark.examples.SparkPi --master spark://aliyun:7077 /root/app/spark/examples/jars/spark-examples_2.11-2.3.3.jar 100

计算结果

2019-08-16 20:17:34 INFO  DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 8.394727 s
Pi is roughly 3.1414743141474313

命令解释:
spark-submint :提交命令,提交应用程序,该命令在spark安装目录下的bin底下
–class org.apache.spark.examples.SparkPi:应用程序的主类
–master spark://aliyun:7077 :运行的master
/root/app/spark/examples/jars/spark-examples_2.11-2.3.3.jar:jar包所在路径

spak-shell

local模式

Local模式就是运行在一台计算机上的模式。
local 所有计算都运行在一个线程当中
locak[K]指定使用的几个线程运行计算
local[*]安装CPU最大core来设置线程数

spark

$ spark-shell --master spark://aliyun:7077
#spark-shell
2019-08-05 19:31:54 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://aliyun:4040
Spark context available as 'sc' (master = local[*], app id = local-1565004727280).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.3
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.

spark laocal本地运行模式 local[*]以当前cpu核数运行

Spark context available as 'sc' (master = local[*], app id = local-1565004727280).
Spark session available as 'spark'.

在spark的安装目录底下创建测试文件

# cd $SPARK_HOME
# mkdir input
# vim word.txt
hello world
hello spark
hello hadoop

启动spark-shell, 读取本地文件input文件夹数据;

# spark-shell
scala> sc.textFile("input")
res0: org.apache.spark.rdd.RDD[String] = input MapPartitionsRDD[1] at textFile at <console>:25

压平操作,按照空格分割符将一行数据映射成一个个单词;

scala> sc.textFile("input").flatMap(_.split(" "))
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:25

对每一个元素操作,将单词映射为元组;

scala> sc.textFile("input").flatMap(_.split(" "))
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:25

按照key将值进行聚合,相加,统计结果

scala> sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res4: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))

Maven 的worldcount

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.bx</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.2.3</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>WordCount</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.1.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

创建测试文件 点击最上层工程目录创建input目录 word.txt

hello spark
hello scala
hello hive
hello hadoop

创建src/main/scala目录下WordCount

package cn.bx.spark

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("input")
    val words = lines.flatMap(_.split(" "))
    val wordToOne = words.map((_, 1))
    val wordToSum = wordToOne.reduceByKey(_ + _)
    wordToSum.collect().foreach(println)

  }
}

执行结果

(scala,1)
(hive,1)
(hello,4)
(spark,1)
(hadoop,1)

开发hadoop的worldcount

设置maven,修改pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.bx</groupId>
    <artifactId>SparkNote</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.12</scala.version>
        <spark.version>2.2.3</spark.version>
        <hadoop.version>2.6.0</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--spark core depedency-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--hadoop-client depedency-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hadoop</groupId>-->
<!--            <artifactId>hadoop-client</artifactId>-->
<!--            <version>${hadoop.version}</version>-->
<!--        </dependency>-->
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

在main/scala/创建SparkWordCount.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkWordCount")
    val sc = new SparkContext(conf)
    val lines:RDD[String]= sc.textFile(args(0))
    val words:RDD[String]= lines.flatMap(_.split(" "))
    val tuples:RDD[(String,Int)] =words.map((_,1))
    val sum:RDD[(String,Int)] = tuples.reduceByKey(_+_)
    val sored:RDD[(String,Int)]= sum.sortBy(_._2,ascending = false)
    sored.saveAsTextFile(args(1))
    sc.stop()
  }
}

提交jar

vim word.txt

hello world
hello world
hello spark
hello java
hello golang
hello hadoop

将当word.txt放入hdfs中

#hadoop fs -mkdir -p /wc/input
# hadoop fs -put word.txt /wc/input/
# hadoop fs -cat /wc/input/word.txt
hello world
hello world
hello spark
hello java
hello golang
hello hadoop

maven 编译成jar

# spark-submit --class SparkWordCount --master spark://aliyun:7077 SparkNote-1.0-SNAPSHOT.jar hdfs://localhost:9000/wc/input/word.txt hdfs://localhost:9000/wc/output

执行结果

hadoop fs -ls /wc/output/
19/08/07 01:02:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-08-07 01:01 /wc/output/_SUCCESS
-rw-r--r--   3 root supergroup         20 2019-08-07 01:01 /wc/output/part-00000
-rw-r--r--   3 root supergroup         41 2019-08-07 01:01 /wc/output/part-00001

查看结果

# hadoop fs -cat /wc/output/part-00000
(hello,6)
(world,2)
# hadoop fs -cat /wc/output/part-00001
(golang,1)
(java,1)
(spark,1)
(hadoop,1)
上一篇 下一篇

猜你喜欢

热点阅读