程序员技术栈大数据

Spark 程序设计

2018-05-02  本文已影响20人  溯水心生

一、Spark Shell on Client

scala> var rdd =sc.parallelize(1 to 100 ,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count
res0: Long = 100  
scala> val rdd2=rdd.map(_ + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.take(3)
res1: Array[Int] = Array(2, 3, 4)
scala> val rdd1=sc.textFile("file://home/hadoop/apps/sparkwc")
rdd1: org.apache.spark.rdd.RDD[String] = file://home/hadoop/apps/sparkwc MapPartitionsRDD[3] at textFile at <console>:24
cala> val rdd1=sc.textFile("file:///home/hadoop/apps/sparkwc")
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/apps/sparkwc MapPartitionsRDD[9] at textFile at <console>:24

scala> val rdd2=rdd
rdd   rdd1   rdd2   rdd3   rddToDatasetHolder

scala> val rdd2=rdd1.flatMap(_.split("\t"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26

scala> val rdd3=rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:28

scala> val rdd4=rdd3.reduceByKey(_ + _)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at <console>:30

scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))

scala> rdd4.saveAsTextFile("file:///home/hadoop/apps/out1")
[hadoop@hadoop01 apps]$ cd out1/
[hadoop@hadoop01 out1]$ ls
part-00000  _SUCCESS
[hadoop@hadoop01 out1]$ cat part-00000 
(spark,1)
(hadoop,1)
(hello,3)
(world,1)
[hadoop@hadoop01 out1]$ pwd
/home/hadoop/apps/out1

WebUI 地址:http://192.168.43.20:4040/jobs/

二、Spark Shuffle

三、Shuffle Write(hash-based)

四、Shuffle Write(hash-based优化)

五、Shuffle Write(hash-based优化)Shuffle Write(sort-based)

六、Shuffle Read

七、Spark History Server配置

<property>
<name>yarn.log.server.url</name>
<value>http://node02:19888/jobhistory/logs</value>
<description> Yarn JobHistoryServer访问地址 </description>
</property>
spark.yarn.historyServer.address=192.168.183.100:18080  spark.history.ui.port=18080
spark.eventLog.enabled=true  spark.eventLog.dir=hdfs:///spark_logs
spark.history.fs.logDirectory=hdfs:///spark_logs

1.Spark History Server启动

sbin/start-history-server.sh
httpL://192.168.183.100:18080
Spark history

七、Spark运行环境优化

hadoop fs -put spark-libs.jar /system/spark

修改spark安装包conf目录下spark-defaults.conf配置文件添加spark-
libs.jar在HDFS上的路径

spark.yarn.archive=hdfs:///system/spark/spark-libs.jar

八、Spark编程模型

1.提交Spark程序到Yarn上

image

2.Spark RDD算子分类

3.创建RDD

4.Value数据类型Transformation

5.RDD Action

6.Spark优化-Cache应用

Cache应用

7.Accumulator计数器

val total_counter = sc.accumulator(0L,"total_counter")  
val resultRdd = rowRdd.flatMap(_.split("\t")).map(x=>{  total_counter += 1
(x,1)
}).reduceByKey(_ + _)

通过Spark Web UI查看


Accumulator计数器
上一篇 下一篇

猜你喜欢

热点阅读