hadoop-spark 大数据处理技巧章节一(下)

2019-04-15  本文已影响0人  Kean_L_C

上一章节讲解了如何使用java以及hadoop进行二次排序,这一章节分别尝试java、scala与spark结合实现二次排序。

spark启动

为了启动spark方便一点这里写了简单的脚本文本

[root@master Templates]# cat stop_spark_yarn.sh 
# stop hadoop yarn spark
$SPARK_HOME/sbin/stop-all.sh
$HADOOP_HOME/sbin/stop-all.sh
[root@master Templates]# cat start_spark_yarn.sh 
# start hadoop yarn spark
$HADOOP_HOME/sbin/start-all.sh
$SPARK_HOME/sbin/start-all.sh
[root@master Templates]# chmod 777 ../Templates/*

启动后

image.png

maven依赖

spark使用scala语言的,这里为了能让java能在spark中跑起来,需要添加些maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kean.learn</groupId>
    <artifactId>hadoop_spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <!--<plugin>-->
                <!--<groupId>org.scala-tools</groupId>-->
                <!--<artifactId>maven-scala-plugin</artifactId>-->
                <!--<version>2.15.2</version>-->
                <!--<executions>-->
                    <!--<execution>-->
                        <!--<goals>-->
                            <!--<goal>compile</goal>-->
                            <!--<goal>testCompile</goal>-->
                        <!--</goals>-->
                    <!--</execution>-->
                <!--</executions>-->
            <!--</plugin>-->
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.8</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-math3 -->
        <!--<dependency>-->
            <!--<groupId>org.apache.commons</groupId>-->
            <!--<artifactId>commons-math3</artifactId>-->
            <!--<version>3.6.1</version>-->
        <!--</dependency>-->
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-math -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math</artifactId>
            <version>2.2</version>
        </dependency>



    </dependencies>


    <repositories>
        <!-- 代码库 -->
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
</project>

运行代码直接使用大数据处理技巧书中的代码如下:

package org.dataalgorithms.chap01.sparkwithlambda;

// STEP-0: import required Java/Spark classes.

import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
//
import scala.Tuple2;
//
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
//
import org.dataalgorithms.util.SparkUtil;
import org.dataalgorithms.util.DataStructures;


/**
 * SecondarySortUsingCombineByKey class implements the secondary sort design pattern
 * by using combineByKey().
 * <p>
 * <p>
 * Input:
 * <p>
 * name, time, value
 * x,2,9
 * y,2,5
 * x,1,3
 * y,1,7
 * y,3,1
 * x,3,6
 * z,1,4
 * z,2,8
 * z,3,7
 * z,4,0
 * p,1,10
 * p,3,60
 * p,4,40
 * p,6,20
 * <p>
 * Output: generate a time-series looking like this:
 * <p>
 * t1   t2   t3   t4  t5     t6
 * x => [3,  9,   6]
 * y => [7,  5,   1]
 * z => [4,  8,   7,   0]
 * p => [10, null, 60, 40, null , 20]
 * <p>
 * x => [(1,3), (2,9), (3,6)]            where 1 < 2 < 3
 * y => [(1,7), (2,5), (3,1)]            where 1 < 2 < 3
 * z => [(1,4), (2,8), (3,7), (4,0)]     where 1 < 2 < 3 < 4
 * p => [(1,10), (3,60), (4,40), (6,20)] where 1 < 3 < 4 < 6
 *
 * @author Mahmoud Parsian
 */
public class SecondarySortUsingCombineByKey {

    public static void main(String[] args) throws Exception {

        // STEP-1: read input parameters and validate them
        if (args.length < 2) {
            System.err.println("Usage: SecondarySortUsingCombineByKey <input> <output>");
            System.exit(1);
        }
        String inputPath = args[0];
        System.out.println("inputPath=" + inputPath);
        String outputPath = args[1];
        System.out.println("outputPath=" + outputPath);

        // STEP-2: Connect to the Sark master by creating JavaSparkContext object
        final JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

        // STEP-3: Use ctx to create JavaRDD<String>
        //  input record format: <name><,><time><,><value>
        JavaRDD<String> lines = ctx.textFile(inputPath, 1);

        // STEP-4: create (key, value) pairs from JavaRDD<String> where
        // key is the {name} and value is a pair of (time, value).
        // The resulting RDD will be JavaPairRDD<String, Tuple2<Integer, Integer>>.    
        // convert each record into Tuple2(name, time, value)
        // PairFunction<T, K, V>    T => Tuple2(K, V) where K=String and V=Tuple2<Integer, Integer>
        System.out.println("===  DEBUG STEP-4 ===");
        // 获取读取行数据利用Tuple2进行保存
        JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = lines.mapToPair((String s) -> {
            String[] tokens = s.split(","); // x,2,5
            // System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
            Tuple2<Integer, Integer> timevalue = new Tuple2<>(Integer.parseInt(tokens[1]), Integer.parseInt(tokens[2]));
            return new Tuple2<>(tokens[0], timevalue);
        });

        // STEP-5: validate STEP-4, we collect all values from JavaPairRDD<> and print it.    
        // List<Tuple2<String, Tuple2<Integer, Integer>>> output = pairs.collect();
        // for (Tuple2 t : output) {
        //     Tuple2<Integer, Integer> timevalue = (Tuple2<Integer, Integer>) t._2;
        //     System.out.println(t._1 + "," + timevalue._1 + "," + timevalue._2);
        // }

        // How to use combineByKey(): to use combineByKey(), you 
        // need to define 3 basic functions f1, f2, f3:
        // and then you invoke it as: combineByKey(f1, f2, f3)
        //    function 1: create a combiner data structure 
        //    function 2: merge a value into a combined data structure
        //    function 3: merge two combiner data structures


        // function 1: create a combiner data structure         
        // Here, the combiner data structure is a SortedMap<Integer,Integer>,
        // which keeps track of (time, value) for a given key
        // Tuple2<Integer, Integer> = Tuple2<time, value>
        // SortedMap<Integer, Integer> = SortedMap<time, value>  默认按键值升序排列
        Function<Tuple2<Integer, Integer>, SortedMap<Integer, Integer>> createCombiner = (Tuple2<Integer, Integer> x)
                -> {
            Integer time = x._1;
            Integer value = x._2;
            SortedMap<Integer, Integer> map = new TreeMap<>();
            map.put(time, value);
            return map;
        };

        // function 2: merge a value into a combined data structure
        Function2<SortedMap<Integer, Integer>, Tuple2<Integer, Integer>, SortedMap<Integer, Integer>> mergeValue
                = (SortedMap<Integer, Integer> map, Tuple2<Integer, Integer> x) -> {
            Integer time = x._1;
            Integer value = x._2;
            map.put(time, value);
            return map;
        };

        // function 3: merge two combiner data structures
        Function2<SortedMap<Integer, Integer>, SortedMap<Integer, Integer>, SortedMap<Integer, Integer>> mergeCombiners
                = (SortedMap<Integer, Integer> map1, SortedMap<Integer, Integer> map2) -> {
            if (map1.size() < map2.size()) {
                return DataStructures.merge(map1, map2);
            } else {
                return DataStructures.merge(map1, map2);
            }
        };

        // STEP-5: create sorted (time, value)
        JavaPairRDD<String, SortedMap<Integer, Integer>> combined = pairs.combineByKey(
                createCombiner,
                mergeValue,
                mergeCombiners);

        // STEP-7: validate STEP-6, we collect all values from JavaPairRDD<> and print it.    
        // System.out.println("===  DEBUG STEP-6 ===");
        // List<Tuple2<String, SortedMap<Integer, Integer>>> output2 = combined.collect();
        // for (Tuple2<String, SortedMap<Integer, Integer>> t : output2) {
        //     String name = t._1;
        //     SortedMap<Integer, Integer> map = t._2;
        //     System.out.println(name);
        //     System.out.println(map);
        // }

        // persist output
        combined.saveAsTextFile(outputPath);

        // done!
        ctx.close();

        // exit
        System.exit(0);
    }

}

spark集群提交任务

[root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/input2/timeseries.txt
 x,2,9
 y,2,5
 x,1,3
 y,1,7
 y,3,1
 x,3,6
 z,1,4
 z,2,8
 z,3,7
 z,4,0
 p,1,10
 p,3,60
 p,4,40
 p,6,20
[root@master bin]# ./spark-submit --master local[2]  --class org.dataalgorithms.chap01.sparkwithlambda.SecondarySortUsingCombineByKey /root/Data/data_algorithms/chapter1/hadoop_spark-1.0-SNAPSHOT.jar /data_algorithms/chapter1/input2 /data_algorithms/chapter1/output2
[root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/out*/p*
( p,{1=10, 3=60, 4=40, 6=20})
( x,{1=3, 2=9, 3=6})
( y,{1=7, 2=5, 3=1})
( z,{1=4, 2=8, 3=7, 4=0})

代码很好理解,如果懂一点scala的tuple集合就更好理解,用java写spark感觉怪怪的,有点像混合编程,以前看过一点scala知识,索性再次结合spark学习下scala,这次希望不是从入门到放弃。

scala

build.sbt

name := "hadoop_spark_scala"

version := "0.1"

scalaVersion := "2.11.8"

//resolvers ++= Seq( //额外仓库添加
//  "Admonitor Repository" at "http://maven.mzsvn.com/repository/admonitor",
//  "Local Maven Repository" at "local-maven:file://D:/java_workspace/repository"
//)

libraryDependencies ++= Seq( //依赖库
  "org.apache.spark" % "spark-core_2.10" % "1.6.0",
  "org.apache.hadoop" % "hadoop-common" % "2.7.3",
  "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "2.7.3"
)
package org.dataalgorithms.chap01.scala

import org.apache.spark.Partitioner


/**
  * A custom partitioner
  *
  * org.apache.spark.Partitioner:
  * An abstract class that defines how the elements in a
  * key-value pair RDD are partitioned by key. Maps each
  * key to a partition ID, from 0 to numPartitions - 1.
  */
class CustomPartitioner(partitions: Int) extends Partitioner {

    require(partitions > 0, s"Number of partitions ($partitions) cannot be negative.")

    def numPartitions: Int = partitions

    def getPartition(key: Any): Int = key match {
        case (k: String, v: Int) => math.abs(k.hashCode % numPartitions)
        case null => 0
        case _ => math.abs(key.hashCode % numPartitions)
    }

    override def equals(other: Any): Boolean = other match {
        case h: CustomPartitioner => h.numPartitions == numPartitions
        case _ => false
    }

    override def hashCode: Int = numPartitions
}
package org.dataalgorithms.chap01.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
  * Spark/Scala solution to secondary sort
  *
  * @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
  * @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
  *
  */
object SecondarySort {

    def main(args: Array[String]): Unit = {
        //
        if (args.length != 3) {
            println("Usage <number-of-partitions> <input-path> <output-path>")
            sys.exit(1)
        }

        val partitions = args(0).toInt
        val inputPath = args(1)
        val outputPath = args(2)

        val config = new SparkConf
        config.setAppName("SecondarySort")
        val sc = new SparkContext(config)

        val input = sc.textFile(inputPath)

        //------------------------------------------------
        // each input line/record has the following format:
        // <id><,><time><,><value>
        //-------------------------------------------------
        val valueToKey = input.map(x => {
            val line = x.split(",")
            ((line(0) + "-" + line(1), line(2).toInt), line(2).toInt)
        })

        implicit def tupleOrderingDesc = new Ordering[Tuple2[String, Int]] {
            override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]): Int = {
                if (y._1.compare(x._1) == 0) y._2.compare(x._2)
                else y._1.compare(x._1)
            }
        }

        val sorted = valueToKey.repartitionAndSortWithinPartitions(new CustomPartitioner(partitions))

        val result = sorted.map {
            case (k, v) => (k._1, v)
        }

        result.saveAsTextFile(outputPath)

        // done
        sc.stop()
    }
}

打包成jar提交集群

[root@master bin]# ./spark-submit --master local[2]  --class org.dataalgorithms.chap01.scala.SecondarySort /root/Data/data_algorithms/chapter1/hadoop_spark_scala.jar 3 /data_algorithms/chapter1/input2 /data_algorithms/chapter1/output2
19/04/18 00:07:42 INFO spark.SparkContext: Running Spark version 2.2.1
19/04/18 00:07:43 INFO spark.SparkContext: Submitted application: SecondarySort
19/04/18 00:07:43 INFO spark.SecurityManager: Changing view acls to: root
19/04/18 00:07:43 INFO spark.SecurityManager: Changing modify acls to: root
19/04/18 00:07:43 INFO spark.SecurityManager: Changing view acls groups to: 
19/04/18 00:07:43 INFO spark.SecurityManager: Changing modify acls groups to: 
19/04/18 00:07:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
19/04/18 00:07:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 39070.
19/04/18 00:07:44 INFO spark.SparkEnv: Registering MapOutputTracker
19/04/18 00:07:44 INFO spark.SparkEnv: Registering BlockManagerMaster
19/04/18 00:07:44 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/04/18 00:07:44 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/04/18 00:07:44 INFO storage.DiskBlockManager: Created local directory at /opt/spark-2.2.1-bin-hadoop2.7/blockmgr-72b30be5-8b8a-4589-90da-20596018bfff
19/04/18 00:07:44 INFO memory.MemoryStore: MemoryStore started with capacity 93.3 MB
19/04/18 00:07:44 INFO spark.SparkEnv: Registering OutputCommitCoordinator
19/04/18 00:07:44 INFO util.log: Logging initialized @2833ms
19/04/18 00:07:44 INFO server.Server: jetty-9.3.z-SNAPSHOT
19/04/18 00:07:44 INFO server.Server: Started @2926ms
19/04/18 00:07:44 INFO server.AbstractConnector: Started ServerConnector@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/04/18 00:07:44 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@120f38e6{/jobs,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@fac80{/jobs/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@649f2009{/jobs/job,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@797501a{/jobs/job/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57f791c6{/stages,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c4f9535{/stages/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30c31dd7{/stages/stage,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@241a53ef{/stages/stage/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2db2cd5{/stages/pool,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@615f972{/stages/pool/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@73393584{/storage,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1827a871{/storage/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7249dadf{/storage/rdd,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@66238be2{/storage/rdd/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@200606de{/environment,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@f8908f6{/environment/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2ef8a8c3{/executors,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@63fd4873{/executors/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7544a1e4{/executors/threadDump,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7957dc72{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3aacf32a{/static,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3d4d3fe7{/,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51684e4a{/api,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c451c9c{/jobs/job/kill,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@372b0d86{/stages/stage/kill,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.16.21.220:4040
19/04/18 00:07:44 INFO spark.SparkContext: Added JAR file:/root/Data/data_algorithms/chapter1/hadoop_spark_scala.jar at spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar with timestamp 1555517264889
19/04/18 00:07:45 INFO executor.Executor: Starting executor ID driver on host localhost
19/04/18 00:07:45 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43710.
19/04/18 00:07:45 INFO netty.NettyBlockTransferService: Server created on 172.16.21.220:43710
19/04/18 00:07:45 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/04/18 00:07:45 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.16.21.220:43710 with 93.3 MB RAM, BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4d8539de{/metrics/json,null,AVAILABLE,@Spark}
19/04/18 00:07:46 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 240.0 KB, free 93.1 MB)
19/04/18 00:07:46 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.2 KB, free 93.0 MB)
19/04/18 00:07:46 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.16.21.220:43710 (size: 23.2 KB, free: 93.3 MB)
19/04/18 00:07:46 INFO spark.SparkContext: Created broadcast 0 from textFile at SecondarySort.scala:30
19/04/18 00:07:47 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:47 INFO spark.SparkContext: Starting job: saveAsTextFile at SecondarySort.scala:54
19/04/18 00:07:47 INFO mapred.FileInputFormat: Total input paths to process : 1
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Registering RDD 2 (map at SecondarySort.scala:36)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SecondarySort.scala:54) with 3 output partitions
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at SecondarySort.scala:54)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at SecondarySort.scala:36), which has no missing parents
19/04/18 00:07:47 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 93.0 MB)
19/04/18 00:07:47 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 93.0 MB)
19/04/18 00:07:47 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.16.21.220:43710 (size: 2.4 KB, free: 93.3 MB)
19/04/18 00:07:47 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at SecondarySort.scala:36) (first 15 tasks are for partitions Vector(0, 1))
19/04/18 00:07:47 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
19/04/18 00:07:47 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4873 bytes)
19/04/18 00:07:47 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4873 bytes)
19/04/18 00:07:47 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
19/04/18 00:07:47 INFO executor.Executor: Fetching spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar with timestamp 1555517264889
19/04/18 00:07:47 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
19/04/18 00:07:47 INFO client.TransportClientFactory: Successfully created connection to /172.16.21.220:39070 after 29 ms (0 ms spent in bootstraps)
19/04/18 00:07:47 INFO util.Utils: Fetching spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar to /opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600/userFiles-d948e5f9-0504-4421-aa88-19f186e67f39/fetchFileTemp2679433532910837467.tmp
19/04/18 00:07:48 INFO executor.Executor: Adding file:/opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600/userFiles-d948e5f9-0504-4421-aa88-19f186e67f39/hadoop_spark_scala.jar to class loader
19/04/18 00:07:48 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/data_algorithms/chapter1/input2/timeseries.txt:0+51
19/04/18 00:07:48 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/data_algorithms/chapter1/input2/timeseries.txt:51+51
19/04/18 00:07:48 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1070 bytes result sent to driver
19/04/18 00:07:48 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1070 bytes result sent to driver
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 802 ms on localhost (executor driver) (1/2)
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 801 ms on localhost (executor driver) (2/2)
19/04/18 00:07:48 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (map at SecondarySort.scala:36) finished in 0.836 s
19/04/18 00:07:48 INFO scheduler.DAGScheduler: looking for newly runnable stages
19/04/18 00:07:48 INFO scheduler.DAGScheduler: running: Set()
19/04/18 00:07:48 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)
19/04/18 00:07:48 INFO scheduler.DAGScheduler: failed: Set()
19/04/18 00:07:48 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SecondarySort.scala:54), which has no missing parents
19/04/18 00:07:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
19/04/18 00:07:48 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 71.9 KB, free 93.0 MB)
19/04/18 00:07:48 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 26.0 KB, free 92.9 MB)
19/04/18 00:07:48 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.16.21.220:43710 (size: 26.0 KB, free: 93.2 MB)
19/04/18 00:07:48 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
19/04/18 00:07:48 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SecondarySort.scala:54) (first 15 tasks are for partitions Vector(0, 1, 2))
19/04/18 00:07:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)
19/04/18 00:07:48 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2)
19/04/18 00:07:48 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 12 ms
19/04/18 00:07:48 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:48 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000001_3' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000001
19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000001_3: Committed
19/04/18 00:07:49 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1224 bytes result sent to driver
19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000000_2' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000000
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, localhost, executor driver, partition 2, ANY, 4621 bytes)
19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000000_2: Committed
19/04/18 00:07:49 INFO executor.Executor: Running task 2.0 in stage 1.0 (TID 4)
19/04/18 00:07:49 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1181 bytes result sent to driver
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 849 ms on localhost (executor driver) (1/3)
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 855 ms on localhost (executor driver) (2/3)
19/04/18 00:07:49 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/04/18 00:07:49 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
19/04/18 00:07:49 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000002_4' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000002
19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000002_4: Committed
19/04/18 00:07:49 INFO executor.Executor: Finished task 2.0 in stage 1.0 (TID 4). 1181 bytes result sent to driver
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 514 ms on localhost (executor driver) (3/3)
19/04/18 00:07:49 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
19/04/18 00:07:49 INFO scheduler.DAGScheduler: ResultStage 1 (saveAsTextFile at SecondarySort.scala:54) finished in 1.359 s
19/04/18 00:07:49 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SecondarySort.scala:54, took 2.695980 s
19/04/18 00:07:50 INFO server.AbstractConnector: Stopped Spark@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/04/18 00:07:50 INFO ui.SparkUI: Stopped Spark web UI at http://172.16.21.220:4040
19/04/18 00:07:50 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/04/18 00:07:50 INFO memory.MemoryStore: MemoryStore cleared
19/04/18 00:07:50 INFO storage.BlockManager: BlockManager stopped
19/04/18 00:07:50 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
19/04/18 00:07:50 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/04/18 00:07:50 INFO spark.SparkContext: Successfully stopped SparkContext
19/04/18 00:07:50 INFO util.ShutdownHookManager: Shutdown hook called
19/04/18 00:07:50 INFO util.ShutdownHookManager: Deleting directory /opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600
[root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/out*/p*
( z-2,8)
( y-3,1)
( x-1,3)
( p-6,20)
( p-3,60)
( z-3,7)
( y-1,7)
( x-2,9)
( p-4,40)
( p-1,10)
( z-4,0)
( z-1,4)
( y-2,5)
( x-3,6)

上一篇下一篇

猜你喜欢

热点阅读