foreach(func)

2019-08-06  本文已影响0人  yayooo

作用:在数据集的每一个元素上,运行函数func进行更新。

package com.atguigu

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark28_RDD_Action5 {

    def main(args: Array[String]): Unit = {

        // 准备Spark配置对象
        val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

        // 获取Spark上下文环境对象 :
        val sc = new SparkContext(conf)

        // 行动算子 - foreach
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))

        rdd.collect().foreach(println)
        println("********************")
        // Driver Coding
        rdd.foreach{
            // Executor Coding
            println
        }

        // 释放资源
        sc.stop()

    }
}

结论:算子前的code在Driver端执行的,算子里面的代码在Excutor端执行的。
直接用foreach打印,将数字1,2,3,4,5,6发给多个Excutor,不能确定谁先打印。
使用collect收集到Driver端,Driver在内存中打印。

打印如下

1
2
3
4
5
6
********
19/07/31 19:40:18 INFO SparkContext: Starting job: foreach at Action.scala:18
19/07/31 19:40:18 INFO DAGScheduler: Got job 1 (foreach at Action.scala:18) with 4 output partitions
19/07/31 19:40:18 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at Action.scala:18)
19/07/31 19:40:18 INFO DAGScheduler: Parents of final stage: List()
19/07/31 19:40:18 INFO DAGScheduler: Missing parents: List()
19/07/31 19:40:18 INFO DAGScheduler: Submitting ResultStage 1 (ParallelCollectionRDD[0] at makeRDD at Action.scala:12), which has no missing parents
19/07/31 19:40:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 1304.0 B, free 1444.8 MB)
19/07/31 19:40:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 938.0 B, free 1444.8 MB)
19/07/31 19:40:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.68.1:12401 (size: 938.0 B, free: 1444.8 MB)
19/07/31 19:40:18 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
19/07/31 19:40:18 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (ParallelCollectionRDD[0] at makeRDD at Action.scala:12)
19/07/31 19:40:18 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
19/07/31 19:40:18 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, localhost, executor driver, partition 0, PROCESS_LOCAL, 5885 bytes)
19/07/31 19:40:18 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, localhost, executor driver, partition 1, PROCESS_LOCAL, 5889 bytes)
19/07/31 19:40:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, localhost, executor driver, partition 2, PROCESS_LOCAL, 5885 bytes)
19/07/31 19:40:18 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, localhost, executor driver, partition 3, PROCESS_LOCAL, 5889 bytes)
19/07/31 19:40:18 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
19/07/31 19:40:18 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
19/07/31 19:40:18 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
19/07/31 19:40:18 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
4
19/07/31 19:40:18 INFO Executor: Finished task 3.0 in stage 1.0 (TID 7). 829 bytes result sent to driver
19/07/31 19:40:18 INFO Executor: Finished task 1.0 in stage 1.0 (TID 5). 919 bytes result sent to driver
5
6
2
3
1
上一篇下一篇

猜你喜欢

热点阅读