大数据技术分享

Spark系列 —— 各类算子详解(一)

2019-07-29  本文已影响1人  code_solve

前言

本文主要是一篇总结性文章,将列举绝大部分的 Spark 算子以及其常用场景

Transformation 算子

该类算子属于 Spark 转换类算子,
不会立即执行,
其需要 Action 算子 来触发,
才能正在执行。

  1. map(func)
    Return a new distributed dataset formed by passing each element of the source through a function func.
    将 RDD 的所有元素通过 func 转换,返回一个新的 RDD。
    补充:英文中的 dataset 意为 数据集,这里就是我们的 RDD

  2. filter(func)
    Return a new dataset formed by selecting those elements of the source on which funcreturns true.
    将 RDD 的所有元素通过 func 进行过滤,返回一个新的 RDD。
    补充:func函数返回 false 则过滤,返回true则保留

  3. flatMap(func)
    Similar to map, but each input item can be mapped to 0 or more output items
    (so func should return a Seq rather than a single item).
    和 Map 算子类似,但是需要返回一个 集合,
    并且集合的每一个元素会作为新RDD的一行,或者说一个元素

  1. mapPartitions(func)
    Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
    类似于 Map 算子,但是不是基于每一条数据,而是基于一个 partition 来计算的,func 将接受一个迭代器,可以从迭代器中获取每一条数据进行操作,返回一个迭代器。形成一个新的 RDD。
    补充:
    • 该算子一般用于优化 Map 算子,如下面这个例子:
    sc.parallelize(Seq(1, 2, 3, 4, 5),1)
    .mapPartitions(iter => {
        var res = List[Int]()
        //创建 mysql 客户端
        println("连接数据库")
        while (iter.hasNext) {
            val next = iter.next()
            println("向数据库写入数据:" + next)
            res = res :+ next
        }
        res.toIterator
      })
      .foreach(println)
    
    输出如下,我们可以发现,通过一次连接我们就将一个 partition的数据都写入了数据库,
    如果使用的是 Map 算子,那么每写入一条数据都需要一次数据库连接,很明显是不划算的:
    连接数据库
    向数据库写入数据:1
    向数据库写入数据:2
    向数据库写入数据:3
    向数据库写入数据:4
    向数据库写入数据:5
    1
    2
    3
    4
    5
    
    • 上面的写法并非最优写法,我们可以这样写:
      sc.parallelize(Seq(1, 2, 3, 4, 5),1)
      .mapPartitions(iter => {
        var res = List[Int]()
        println("连接数据库")
        iter.map(next=>{
          println("向数据库写入数据:" + next)
          next
        })
      })
      .foreach(println)
    
    输出如下,其中的差异你可以细细体会,
    不但代码更简单, 而且可以防止partition数据过大导致的 OOM 等问题:
    连接数据库
    向数据库写入数据:1
    1
    向数据库写入数据:2
    2
    向数据库写入数据:3
    3
    向数据库写入数据:4
    4
    向数据库写入数据:5
    5
    
  1. mapPartitionsWithIndex(func)
    Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
    这个算子就是 mapPartitions 的加强版了,可以方便的获取到 partition 的 index。
    补充:如果你有这个算子的应用场景 ,欢迎补充!!本人目前没用到什么合适的地方。

  2. sample(withReplacement, fraction, seed)
    Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
    可以从 RDD 中抽取一部分数据重新组合成一个新的 RDD,
    withReplacement:否有放回
    fraction: 抽样比例
    seed: 种子(如果种子一样,那么抽取到的数据是一样的)
    补充: 该算子使用场景还是比较多的,至于具体的场景,这个就不赘述了。

  3. union(otherDataset)
    Return a new dataset that contains the union of the elements in the source dataset and the argument.
    将两个 RDD 合并成一个新的 RDD。
    补充:该算子纯粹就是一个逻辑上的概念,将两个 RDD 看成一个 RDD 处理,所以不会发生Shuffle。

  1. intersection(otherDataset)
    Return a new RDD that contains the intersection of elements in the source dataset and the argument.
    返回两个 RDD 交集作为一个新的 RDD。

  2. distinct([numPartitions]))
    Return a new dataset that contains the distinct elements of the source dataset. |
    对一个 RDD 的数据去重后作为一个新 RDD

  3. groupByKey([numPartitions])
    When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
    Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key,
    using reduceByKey or aggregateByKey will yield much better performance.
    Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
    KV 格式的 RDD才能使用,对 Key 作分组后形成一个 新的 RDD,
    这里不建议使用该算子,尽量用 reduceByKey 或者 aggregateByKey 来代替,
    这里主要是考虑到数据量的问题,
    reduceByKey 或者 aggregateByKey 是会在shuffle的聚合的时候进行一个预聚合,
    可以减少数据量,加快运行速度。
    不过实际生产中还是会使用的到,这样主要是要注意一下这个问题。

  1. reduceByKey(func, [numPartitions])
    When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
    KV 格式的 RDD才能使用,根据 key 进行分组聚合,形成一个 新的RDD。
    某个 key组内 累加 聚合 逻辑大致如下:

    • 比如一个key分组后的数据是 (1,2,3,4,5)
    • 那么聚合逻辑大概就是 :
      1. 先聚合 1,2 的得到 ( 1+2=3 ,3,4,5)
      2. 再在 第1步的 基础上聚合 3 的得到 ( 3 + 3 = 6 ,4,5)
      3. 以此类推,最后该组的聚合结果就是 15

    代码如下:

     sc.parallelize(Seq("a" -> 1, "a" -> 2, "a" -> 3, "a" -> 4, "a" -> 5))
        .reduceByKey(_+_)
        .foreach(print)
    
  2. aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
    When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
    KV 格式的 RDD才能使用,类似于 reduceBykey,但是比 reduceBykey 具有更复杂的操作,其不同主要在于以下几点:

    • 可以指定一个初始化值,即第一次聚合的时候会先将第一个数值 和 该初始值进行聚合
    • seqOp 作用在 partition 上的聚合逻辑,可以理解为 MR 中的 combiner
    • combOp 作用在 reduce 端的 聚合逻辑,即MR 中 reduce 的逻辑
      补充:如果我们将 seqOp 和 combOp 是相同逻辑的话,就相当于 reduceByKey ,
      该函数一般还是用来做优化吧,
      比如求 topN,我们只需要在每个 partition 上求 topN 在聚合,
      而不需要全局聚合后再去求 topN。
  1. sortByKey([ascending], [numPartitions])
    When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
    KV 格式的 RDD才能使用, 根据 Key 进行排序,形成一个新的 RDD
    ascending:是否是升序
  1. join(otherDataset, [numPartitions])
    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
    KV 格式的 RDD才能使用,
    类似于我们 sql 里面的 内连接,将两个 RDD 的 key 值进行关联,
    返回在两个 RDD 中都存在的 Key 的数据,形成一个 新的RDD。
    补充:关于 leftOuterJoin, rightOuterJoin, and fullOuterJoin ,和 sql 都差不多,这里就不赘述了。
    如果你还不太了解,可以查看这里

  2. cogroup(otherDataset, [numPartitions])
    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
    KV 格式的 RDD才能使用, 和 groupWith 是一样的
    将两个 RDD聚合,并按照 Key 分组,形成一个新的RDD。

    val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3))
    val r2 = sc.parallelize(Seq("b" -> 1, "c" -> 2, "d" -> 3))
    r1.groupWith(r2).foreach(println)
    

    打印如下,其中 CompactBuffer 可以理解为一个优化后的数组:

    (d,(CompactBuffer(),CompactBuffer(3)))
    (a,(CompactBuffer(1),CompactBuffer()))
    (b,(CompactBuffer(2),CompactBuffer(1)))
    (c,(CompactBuffer(3),CompactBuffer(2)))    
    
  3. cartesian(otherDataset)
    When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
    笛卡尔乘积,一般很少使用,
    不过在机器学习方面会有一定的使用

  4. pipe(command, [envVars])
    Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
    Spark支持通过shell 的指令进行数据转换,
    从 标准输入 通过 管道 到 标准输出。

    sc.parallelize(Seq("hello\tspark","hello\tpython","hello\tscala"))
      .pipe("cut -f1,2")
      .foreach(println)
    

    输出如下,这里我们使用的是shell的 cut 指令,:

    hello   spark
    hello   python
    hello   scala
    

    实际上你也可以传入任何你shell 脚本的路径,
    额外提一句的话,记得在 Linux 上面执行,否则可能会出错,
    这里的话就一个参数 envVars,可以配置一些你的执行环境的参数。
    当然实际上除非你 shell 非常 6,否则还是很少用的到的。

  5. coalesce(numPartitions)
    Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
    重新分区,一般我们为了降低分区数的时候会用到该算子,
    什么时候会降低分区数呢?比如:使用 filter 算子过滤大部分数据 等方面
    补充:警惕数据倾斜,我们一般减少分区的时候是不希望多进行一次 shuffle 的,
    所以才使用的该算子,但是不进行 shuffle 的话,
    会使得多个分区直接看成一个分区,
    如果这多个分区刚好是数据比较大的时候,会导致比较严重的数据倾斜。

  6. repartition(numPartitions)
    Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
    重新分区,其实就是 coalesce 的另外一种表现。
    一般我们还是喜欢使用该算子进行分区

  1. repartitionAndSortWithinPartitions(partitioner)
    Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
    重新分区,并排序。
    相比于repartition ,再进行 排序 ,会有比较好的性能。
    因为可以将排序下推到 shuffle 阶段

以上这些就是在官网找到的所有 transformation 算子了,还有一些官网中没有提到的...或者说我没在官网中找到的...那么下面我们再加一点点。

  1. zip
    如果两个 RDD 具有相同条数的数据,可以使用该算子将其压缩成一个 RDD,
    第一个RDD的第一条数据 和 第二个 RDD 的第一条数据压缩成新RDD的第一条数据,
    第二条 和 第二条 压缩成 新的第二条,以此类推....
    val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3), 3)
    val r2 = sc.parallelize(Seq("a1" -> 1, "b1" -> 3, "c1" -> 2), 3)
    r1.zip(r2).foreach(println)
    
    输出如下:
    ((a,1),(a1,1))
    ((b,2),(b1,3))
    ((c,3),(c1,2))
    
  2. zipWithIndex
    会给每条数据打上一下 index 的下标,
    第一个 partition 的第一条数据 index=0,
    以此类推,
    最后一个 partition 的最后一条数据就是最大的 index。
    val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 4, "b" -> 5, "c" -> 6), 3)
    r1.zipWithIndex().foreach(println)
    

输出如下:

((a,1),0)
((b,2),1)
((c,3),2)
((a,4),3)
((b,5),4)
((c,6),5)

补充:需要注意的是,这个index 会因为你使用一些shuffle算子而改变,
所以要注意使用,一般也是用在机器学习算法里面

  1. zipWithUniqueId
    zipWithIndex类似,不过其 id 会带上分区的信息,
    每个分区的第一条数据 id 即是该分区的分区号,第二条数据的 id = 第一条数据的id + 总分区数

    val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 4, "b" -> 5, "c" -> 6), 3)
    //d打印分区信息
    r1.mapPartitionsWithIndex((x, y) => {
      println("分区:"+x)
      val res = y.map(z => {
        println(z)
        z
      })
      res
    }, false)
      .count()
    println("===============")
    r1.zipWithUniqueId().foreach(println)
    

    输出如下,可以观察下其 id 的规则是否如上面所说:

    分区:0
    (a,1)
    (b,2)
    分区:1
    (c,3)
    (a,4)
    分区:2
    (b,5)
    (c,6)
    ===============
    ((a,1),0)
    ((b,2),1)
    ((c,3),2)
    ((a,4),3)
    ((b,5),4)
    ((c,6),5)
    
  2. zipPartitions( rdd )( func )
    将两个具有相同数量分区的 RDD 进行zip,不要求数据数量一致。
    func: 将两个RDD 聚合的逻辑

     val r2 = sc.parallelize(Seq("a1" -> 1, "b1" -> 3, "c1" -> 2), 3)
     val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 4, "b" -> 5, "c" -> 6), 3)
     r1.zipPartitions(r2)((x, y) => {
          x.++(y)
        })
       .foreach(println)
    

    输出如下,这里的操作是将两个 RDD 进行了聚合并且有排序,并且可以看到,他们的分区数都是3:

    (a,1)
    (b,2)
    (a1,1)
    (c,3)
    (a,4)
    (b1,3)
    (b,5)
    (c,6)
    (c1,2)
    

    补充:这里排序是我们观察到的结果,
    因为本人使用不多,所以不敢保证这个结论的正确性,
    后续有时间可以验证下,
    其次 zipPartitions 还有一些重载,
    大致使用方法都是一样的,这里就不赘述了.

  3. subtractByKey
    将左边的 RDD 根据 key 值 减去 右边的 RDD

    val r2 = sc.parallelize(Seq("a" -> 1, "b" -> 3 ), 3)
    val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 3, "c" -> 4, "a" -> 5, "b" ->6), 3)
    r1.subtractByKey(r2).foreach(println)
    

    输出如下:

    (c,4)
    
  4. subtract
    将左边的 RDD 减去 右边的 RDD

    val r2 = sc.parallelize(Seq("a" -> 1, "b" -> 3 ), 3)
    val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 3, "c" -> 4, "a" -> 5, "b" ->6), 3)
    r1.subtract(r2).foreach(println)
    

    输出如下:

    (c,4)
    (b,6)
    (a,5)
    

写着写着就这么多了,这篇主要讲的都是 Transformation 算子,
本来打算将 Action 算子一起写完的,不过太长的话不太合适,
这里权当总结一下,也算是给新人朋友的一点福利吧!
如果有什么没说清楚的地方,欢迎留言。
如果有补充的,欢迎留言。

你的点赞是对作者最大的支持!!!谢谢!!!

上一篇下一篇

猜你喜欢

热点阅读