Spark权威指南读书笔记(三):低级API

2020-11-29  本文已影响0人  kaiker

第十二章 弹性分布式数据集

RDD

RDD类型

通用型RDD
key-value RDD,支持特殊操作并支持按key自定义数据分片

每个RDD具有五个主要内部属性:

RDD使用方法

创建RDD
spark.range(500).rdd
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
spark.sparkContext.textFile("/some/path/withTextFiles")
转换操作
def startsWithS(individual:String) = {
  individual.startsWith("S")
}
words.filter(word => startsWithS(word)).collect()

val words2 = words.map(word => (word, word(0), word.startsWith("S")))

words.flatMap(word => word.toSeq).take(5)

val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))

动作操作
缓存

默认情况下,仅对内存中的数据进行缓存和持久化

words.cache()

通过pipe方法调用系统命令操作RDD

将每个数据分区交给指定的外部进程来计算得到结果RDD,每个输入分区的所有元素被当做另一个外部进程的标准输入,输入元素由换行符分隔

words.mapPartitions(part => IteratorInt).sum() // 2

words.foreachPartition { iter =>
  import java.io._
  import scala.util.Random
  val randomFileName = new Random().nextInt()
  val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
  while (iter.hasNext) {
      pw.write(iter.next())
  }
  pw.close()
}
spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom().collect()
// Array(Array(Hello), Array(World))

第十三章 高级RDD

Key-Value RDD

words.map(word => (word.toLowerCase, 1))
也可通过keyBy函数生成

val keyword = words.keyBy(word => word.toLowerCase.toSeq(0).toString)
// 输出的形式:[('s', 'Spark'), ('t', 'THE')]

keyword.lookup("s")

聚合操作

KVcharacters.countByKey()

KVcharacters.groupByKey().map(row => (row._1, row._2.reduce(addFunc))).collect()

会把所以key放到一起进行计算,可能出现倾斜

KVcharacters.reduceByKey(addFunc).collect()

val valToCombiner = (value:Int) => List(value)
val mergeValuesFunc = (vals:List[Int], valToAppend:Int) => valToAppend :: vals
val mergeCombinerFunc = (vals1:List[Int], vals2:List[Int]) => vals1 ::: vals2
// now we define these as function variables
val outputPartitions = 6
KVcharacters
  .combineByKey(
    valToCombiner,
    mergeValuesFunc,
    mergeCombinerFunc,
    outputPartitions)
  .collect()

KVcharacters.foldByKey(0)(addFunc).collect() //0为加法,1为乘法

连接

co group

可以将三个key-value RDD一起分组,key在一边,所有value在另一边

import scala.util.Random
val distinctChars = words.flatMap(word => word.toLowerCase.toSeq).distinct
val charRDD = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))
charRDD.cogroup(charRDD2, charRDD3).take(5)
zip

zip把两个RDDyuansu匹配在一起,要求两个RDD元素个数相同,分区数也相同,生成一个PairRDD

控制分区

使用RDD可以控制数据在整个集群上的物理分布

自定义分区
import org.apache.spark.Partitioner
class DomainPartitioner extends Partitioner {
 def numPartitions = 3
 def getPartition(key: Any): Int = {
   val customerId = key.asInstanceOf[Double].toInt
   if (customerId == 17850.0 || customerId == 12583.0) {
     return 0  //把这两个数据量太大的用户放到一个分区里
   } else {
     return new java.util.Random().nextInt(2) + 1
   }
 }
}

keyedRDD
  .partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)
  .take(5)

第十四章 分布式共享变量

广播变量

val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200,
                           "Big" -> -300, "Simple" -> 100)
val suppBroadcast = spark.sparkContext.broadcast(supplementalData)

累加器

case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flights = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet")
  .as[Flight]

import org.apache.spark.util.LongAccumulator
val accUnnamed = new LongAccumulator
val acc = spark.sparkContext.register(accUnnamed)

val accChina = new LongAccumulator
val accChina2 = spark.sparkContext.longAccumulator("China")
spark.sparkContext.register(accChina, "China")

def accChinaFunc(flight_row: Flight) = {
  val destination = flight_row.DEST_COUNTRY_NAME
  val origin = flight_row.ORIGIN_COUNTRY_NAME
  if (destination == "China") {
    accChina.add(flight_row.count.toLong)
  }
  if (origin == "China") {
    accChina.add(flight_row.count.toLong)
  }
}

flights.foreach(flight_row => accChinaFunc(flight_row))

accChina.value // 953
上一篇 下一篇

猜你喜欢

热点阅读