Spark的Transformation和Action算子的原理
RDD Operations
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map
is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce
is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey
that returns a distributed dataset).
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map
will be used in a reduce
and return only the result of the reduce
to the driver, rather than the larger mapped dataset.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist
(or cache
) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
Transformations
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
Transformation | Meaning |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset. |
groupByKey([numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks. |
reduceByKey(func, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
sortByKey([ascending], [numPartitions]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
join(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin , rightOuterJoin , and fullOuterJoin . |
cogroup(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith . |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. |
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. |
Actions
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)
and pair RDD functions doc (Scala, Java) for details.
Action | Meaning |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
count() | Return the number of elements in the dataset. |
first() | Return the first element of the dataset (similar to take(1)). |
take(n) | Return an array with the first n elements of the dataset. |
takeSample(withReplacement, num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. |
takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. |
saveAsSequenceFile(path) | |
(Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). |
saveAsObjectFile(path) | |
(Java and Scala) | Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile() . |
countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. |
The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync
for foreach
, which immediately return a FutureAction
to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.
Shuffle operations
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
Background
To understand what happens during the shuffle we can consider the example of the reduceByKey
operation. The reduceByKey
operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.
In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey
reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.
Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:
-
mapPartitions
to sort each partition using, for example,.sorted
-
repartitionAndSortWithinPartitions
to efficiently sort partitions while simultaneously repartitioning -
sortBy
to make a globally ordered RDD
Operations which can cause a shuffle include repartition operations like repartition
and coalesce
, ‘ByKey operations (except for counting) like groupByKey
and reduceByKey
, and join operations like cogroup
and join
.
Performance Impact
The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map
and reduce
operations.
Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.
Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey
and aggregateByKey
create these structures on the map side, and 'ByKey
operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir
configuration parameter when configuring the Spark context.
Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.
Transformation 和 Action原理图.png下表列出了一些 Spark 常用的 transformations(转换)。详情请参考 RDD API 文档(Scala,Java,Python,R)和 pair RDD 函数文档(Scala,Java)。
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。 |
filter(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。 |
flatMap(func) | 与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item) |
mapPartitions(func) | 与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型。 |
mapPartitionsWithIndex(func) | 与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。 |
sample(withReplacement, fraction, seed) | 样本数据,设置是否放回(withReplacement)、采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。 |
union(otherDataset) | 返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集。 |
intersection(otherDataset) | 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集。 |
distinct([numTasks])) | 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。 |
groupByKey([numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset。注意 : 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好。注意 : 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。 |
reduceByKey(func, [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 reduce 函数 func 来聚合,它必须为 (V,V) => V 类型。像 groupByKey一样,可通过第二个可选参数来配置 reduce 任务的数量。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 combine 函数和一个中间的 “zero” 值来聚合,它必须为 (V,V) => V 类型。为了避免不必要的配置,可以使用一个不同与 input value 类型的 aggregated value 类型。 |
sortByKey([ascending], [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,其中的K 实现了 Ordered,返回一个按 keys 升序或降序的(K, V) pairs 的 dataset。 |
join(otherDataset, [numTasks]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoin 和fullOuterJoin 来实现。 |
cogroup(otherDataset, [numTasks]) | 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。这个操作也调用了 groupWith。 |
cartesian(otherDataset) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个(T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。 |
pipe(command, [envVars]) | 通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。 |
coalesce(numPartitions) | Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。 |
repartition(numPartitions) | Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。 |
下面列出了一些 Spark 常用的 actions 操作。详细请参考 RDD API 文档 (Scala, Java, Python, R) 和 pair RDD 函数文档 (Scala, Java)。
Action | Meaning |
---|---|
reduce(func) | 使用函数 func 聚合数据集(dataset)中的元素,这个函数** func** 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它可以被并行地正确计算。 |
collect() | 在驱动程序中,以一个数组的形式返回数据集的所有元素。这在返回足够小(sufficiently small)的数据子集的过滤器(filter)或其他操作(other operation)之后通常是有用的。 |
count() | 返回数据集中元素的个数。 |
first() | 返回数据集中的第一个元素(类似于 take(1))。 |
take(n) | 将数据集中的前 n 个元素作为一个数组返回。 |
takeSample(withReplacement, num, [seed]) | 对一个数据集随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。 |
takeOrdered(n, [ordering]) | 返回RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前n 个元素。 |
saveAsTextFile(path) | 将数据集中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。 |
saveAsSequenceFile(path) (Java and Scala) | 将数据集中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它Hadoop 支持的文件系统指定的路径中。该操作可以在实现了Hadoop 的Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如Int、Double、String 等等)。 |
saveAsObjectFile(path) (Java and Scala) | 使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。 |
countByKey() | 仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)对 的 hashmap。 |
foreach(func) | 对数据集中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个累加器或与外部存储系统(external storage systems)进行交互。注意:修改除foreach() 之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。 |