Spark 3.0.0 官方文档翻译

01 Quick Start

2020-06-20  本文已影响0人  Whaatfor

转载请注明出处,谢谢合作~

快速上手

该教程可以让你快速上手 Spark 应用。首先介绍通过 Spark 的交互式 shell(Python 或者 Scala)模块展示 Spark 的 API,然后展示如果通过 Java,Scala,Python 语言编写应用程序。

为了接下来的练习,请首先在下载页面(Spark website)下载 Spark,由于不会用到 HDFS,你可以下载集成任意 Hadoop 版本的 Spark。

需要注意的是在 Spark 2.0 版本之前,主要的编程入口是弹性分布式数据集(Resilient Distributed Dataset),简称 RDD;在 Spark 2.0 版本之后,RDDs 被 Dataset 取代,Dataset 和 RDD 一样是强类型的,但是背后提供了更多的优化。RDD 编程接口依旧是可用的,详情参见 RDD programming guide。然而我们强烈建议你切换到 Dataset,Dataset 拥有更好的性能表现,详情参见 SQL programming guide

安全模式

Spark 中的安全机制默认是关闭的,这意味着在默认情况下你的 Spark 系统很容易收到攻击。请在下载和使用之前参考 Spark Security

通过 Spark Shell 进行交互式分析

基础概念

Spark shell 提供了一种简单的方式来学习 API,同时它也是一个强大的交互式分析工具。可以通过 Scala(运行在 JVM 之上,依赖 Java 类库) 或者 Python 启动 Spark shell,在 Spark 根目录启动如下命令:

Scala

./bin/spark-shell

Spark 抽象出来的主要概念叫做 Dataset 的分布式数据集,Datasets 可以通过 Hadoop InputFormats(例如 HDFS 文件)创建,或者从其他的 Datasets 转化而来。让我们通过 Spark 源码目录下的 README 文件中的文本创建一个新的 Dataset :

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

你可以通过调用 action 算子直接从 Dataset 中获取数据,或者通过 transform 算子获得一个新的 Dataset,详情参见 API doc

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在通过 transform 算子在这个 Dataset 的基础上得到一个新的 Dataset,调用 filter 方法来返回一个包含了 README 文件中部分文本的 Dataset。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

可以把 transform 算子和 action 算子连接起来使用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

Python

./bin/pyspark

如果 PySpark 已经通过 pip 安装:

pyspark

Spark 抽象出来的主要概念叫做 Dataset 的分布式数据集,Datasets 可以通过 Hadoop InputFormats(例如 HDFS 文件)创建,或者从其他的 Datasets 转化而来。通过 Spark 源码目录下的 README 文件中的文本创建一个新的 Dataset :

>>> textFile = spark.read.text("README.md")

可以通过调用 action 算子直接从 Dataset 中获取数据,或者通过 transform 算子获得一个新的 Dataset,详情参见 API doc

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

现在通过 transform 算子在这个 Dataset 的基础上得到一个新的 Dataset,我们调用 filter 方法来返回一个包含了 README 文件中部分文本的 Dataset。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

可以把 transform 算子和 action 算子连接起来使用:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

更多的 Dataset 操作方式

Dataset 的 transform 算子和 action 算子可以被应用的更复杂的计算中,比如说找出单词最多的那个文本行的单词数量:

Scala

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

这个操作首先将每一行映射成一个整数,得到一个新的 Dataset,调用新的 Dataset 的 reduce 方法来找出一行的最大单词数。mapreduce 算子的参数是 Scala 函数字面量(闭包),可以使用任意语言特性和类库。例如,可以轻松的调用在别处定义的函数,在此使用 Math.max() 函数让逻辑更清晰:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

由 Hadoop 提出的数据流处理模型叫做 MapReduce,Spark 可以轻松实现 MapReduce 的思路:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,调用方法把文本行的 Dataset 转换为单词的 Dataset,然后通过 groupByKeycount 算子计算文件中每个单词的计数,保存为 (String, Long) 元组的 Dataset。可以通过调用 collect 方法汇聚 word count 的计算结果:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

Python

>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

这个操作首先将每一行映射成一个整数,并将这些整数当作一列命名为「numWords」,由此创建了一个新的 DataFrame。调用该 DataFrame 的 agg 方法来找到最大的那个数字。selectagg方法的参数都是 Column 类型,可以通过 df.colName 来从一个 DataFrame 中获取一列。还可以导入 import pyspark.sql.functions,这个依赖库中提供了大量便捷的函数,这些函数能够从一个 Column 生成另一个 Column。

由 Hadoop 提出的数据流处理模型叫做 MapReduce,Spark 可以轻松实现 MapReduce 的思路:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

这里,在 select 方法中使用explode 方法来把一个文本行的 Dataset 转换成一个单词的 Dataset,然后通过 groupByKeycount 算子计算文件中每个单词的计数,保存为一个包含「word」和「count」两列的 DataFrame。可以通过调用 collect 方法汇聚 word count 的计算结果:

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

Caching

Spark 还支持将数据集缓存到集群中(分布式缓存),在数据被重复使用的场景下缓存很有帮助,比如说高频查询小批量的数据或者执行像 PageRank 一样的迭代算法的时候。举例来说,把 Dataset 标记为需要被缓存:

Scala

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

用 Spark 缓存 100 行的文本文件看上去很 SB,其实重要的是缓存可以应用在很大体量的数据集上,即使它们分布在数千个计算节点上。还可以通过 bin/spark-shell 连接到一个集群上做缓存操作,详情参见 RDD programming guide

Python

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

用 Spark 缓存 100 行的文本文件看上去很 SB,其实重要的是缓存可以应用在很大体量的数据集上,即使它们分布在数千个计算节点上。还可以通过 bin/pyspark 连接到一个集群上做缓存操作,详情参见 RDD programming guide

独立应用程序

要想用 Spark API 写一个独立应用程序,需要通过一个简单的应用程序感受一下。

Scala

通过 Scala 编写一个非常简单的 Spark 应用程序,文件命名为 SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

注意应用程序需要定义一个 main() 方法而不是继承自 scala.Appscala.App 的子类实例可能无法正常工作。

该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。不同于前面的示例(Spark shell 会初始化自己的 SparkSession 实例),这里把 SparkSession 的初始化当作应用程序的一部分。

调用 SparkSession.builder 方法来构建一个 SparkSession 实例,配置应用程序名称之后调用 getOrCreate 方法获取一个 SparkSession 实例。

应用程序依赖于 Spark API,所以还需要包括一个 sbt 配置文件 build.sbt,该文件声明了 Spark 依赖,该文件还添加了 Spark 所依赖的仓库:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

为了让 sbt 正常工作,需要把 SimpleApp.scalabuild.sbt 文件整理为经典目录结构。一旦操作完成,就可以创建一个包含应用程序的 JAR 文件,然后通过 spark-submit 脚本运行我们的程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

Java

在该示例中会通过 Maven 来编译应用程序 JAR 文件,也可以通过其他类似的构建系统打包。

创建一个非常简单的 Spark 应用程序 SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。不同于前面的示例(Spark shell 会初始化自己的 SparkSession 实例),这里把 SparkSession 的初始化当作应用程序的一部分。

构建该程序片段还需要编写一个 Maven 的 pom.xml 文件,其中包含 Spark 依赖。注意 Spark 依赖的 artifacts 中带有 Scala 版本标记。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

需要把这些文件整理为经典 Maven 目录结构:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,通过 Maven 打包程序然后通过脚本 ./bin/spark-submit 执行程序。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Python

现在展示如何用 Python API (PySpark)编写 Spark 应用程序。

如果你构建一个 PySpark 应用程序hove类库你可以把它添加到 setup.py 文件中:

    install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

例如,创建一个简单的 Spark 应用程序 SimpleApp.py

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。与 Scala 和 Java 版本类似,通过一个 SparkSession 实例来创建 Datasets。对于依赖自定义类库或者第三方类库的应用程序,可以将它们压缩成 .zip 文件,之后通过spark-submit 脚本的 --py-files 参数添加依赖(详情参见 spark-submit --help )。SimpleApp 过于简单,不需要指定其他依赖。

通过 bin/spark-submit 脚本运行该应用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果通过 pip 安装了 PySpark(e.g., pip install pyspark),还可以通过 Python 解释器运行你的应用程序,或者想用 spark-submit 也行,你开心就好。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

接下来干点啥呢

运行你的第一个 Spark 应用程序!

上一篇 下一篇

猜你喜欢

热点阅读