Spark学习笔记

Spark中的多任务处理

2019-03-28  本文已影响0人  LestatZ

Spark中的多任务处理

Spark的一个非常常见的用例是并行运行许多作业。 构建作业DAG后,Spark将这些任务分配到多个Executor上并行处理。
但这并不能帮助我们在同一个Spark应用程序中同时运行两个完全独立的作业,例如同时从多个数据源读取数据并将它们写到对应的存储,或同时处理多个文件等。

每个spark应用程序都需要一个SparkSession(Context)来配置和执行操作。 SparkSession对象是线程安全的,可以根据需要传递给你的Spark应用程序。

一个顺序作业的例子

假设我们有一个spark 2.x应用程序,负责将几个数据写入到HDFS中。

import org.apache.spark.sql.SparkSession

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    doFancyDistinct(df, "hdfs:///dis.parquet")
    doFancySum(df, "hdfs:///sum.parquet")
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
  
  def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)

}

这个程序看起来没有什么问题,Spark将按顺序执行两个动作。但这两个动作是独立, 我们可以同时执行它们。

一个有缺陷的并发作业的例子

如果你快速的在网上搜索一下 “scala异步编程”,你就会被引到Scala Future这个解决方案中。
例如以下为一个并行处理RDD的例子:


import scala.concurrent._
import ExecutionContext.Implicits.global

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}
val n: Int = 2 
val files: Array[String] = ['/tmp/test1.csv','/tmp/test2.csv']

val rdds = files.map(f => pipeline(f, n))

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = Future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

我们只要根据搜索到的文档中提供的例子修改一下,就会得到以下类似内容:

import org.apache.spark.sql.SparkSession
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = Future { df.distinct.write.parquet(outPath) }

  def doFancySum(df: DataFrame, outPath: String) = Future { df.agg(sum("value")).write.parquet(outPath) }
}

ExecutionContext是用于==管理并行操作的Context==。 实际的线程模型可以由开发者明确提供,也可以使用全局默认值(这是一个 ForkJoinPool ),就像我们在上面的代码中使用的一样:

import scala.concurrent.ExecutionContext.Implicits.global

使用Global execution context 的问题在于它并不知道我们是在群集上启动Spark作业。 默认情况下,Global execution context 提供==与运行代码的系统中的处理器相同数量的线程==。 在我们的Spark应用程序中,它将与Driver上的处理器相同数量的线程。

一个优化过的并发作业的例子

我们需要控制我们的线程策略,更一般化地编写我们的程序,以便可以在不同的线程模型中重用它们。

例如以下是我们从重写的函数,它将允许我们精确控制execution context 来管理调用函数时提供的线程数。 例子中添加的隐式参数将允许调用的代码指定运行函数时使用哪个ExecutionContext。

def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
  df.distinct.write.parquet(outPath)
}

现在让我们提出一个比默认的Global execution context更好的策略。我们希望能够指定我们想要的并行度。

import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    // Set number of threads via a configuration property
    val pool = Executors.newFixedThreadPool(5)
    // create the implicit ExecutionContext based on our thread pool
    implicit val xc = ExecutionContext.fromExecutorService(pool)
    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.distinct.write.parquet(outPath)
  }

  def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.agg(sum("value")).write.parquet(outPath) 
  }
}

在这个例子中,我们定义了Execution context变量xc,含有五个线程。

参考资料

Spark Parallel Job Execution
How to run concurrent jobs(actions) in Apache Spark using single spark context
Processing multiple files as independent RDD's in parallel

上一篇 下一篇

猜你喜欢

热点阅读