SparkRDD算子&action

2019-12-10  本文已影响0人  Youngmon

Spark RDD

一、基本RDD运算

 val intRDD=sc.parallelize(List(3,1,2))
 val sr=sc.parallelize(List("a","b"))
 val sr2=sc.parallelize(List("a c","b"))

1.collect RDD类型的数据转化为数组

2.map

intRDD.map(x =>x+1).collect()
intRDD.map(_+1).collect()
sr.map((_,1))

3.flatMap

sr2.flatMap(_.split(" ")).collect()

4.filter

intRDD.filter(x =>x<3).collect()
intRDD.filter(_<3).collect()
sr.filter(x =>x.contains("a")).collect()

4.randomSplit

val intsR=intRDD.randomSplit(Array(0.5,0.5))
insR(0).collect()     

5.group

 val gRDD=intRDD.groupBy(x =>  if(x>2) "d" else "x"  ).collect()
gRDD(0) gRDD(1)

二、多个RDD运算

2.1 union

 val intRDD1=sc.parallelize(List(3,1,2))
 val intRDD2=sc.parallelize(List(4,5))
 val intRDD3=sc.parallelize(List(7,8))
 val intRDD4=sc.parallelize(List(2,8))
 (intRDD1 ++ intRDD2 ++ intRDD3).collect()
  intRDD1.union(intRDD2).collect()

2.2 intersection 交集

intRDD1.intersection(intRDD4).collect()

2.3 subtract 差集

intRDD1.subtract(intRDD4).collect()

2.4 cartesian 卡迪乘积

intRDD1.cartesian(intRDD4).collect()

三、动作运算

intRDD.first
intRDD.take(3)

读取前3条小-->大
intRDD.takeOrdered(3)
读取前3条大-->小
intRDD.takeOrdered(3)(Ordering[Int].reverse)
intRDD.count
intRDD.max
intRDD.mean
intRDD.sum
intRDD.min

四、RDD k-v 基本'转换'运算

val kvRDD1 =sc.parallelize(List((1,2),(1,3),(2,3),(2,2)))
kvRDD1.keys.collect()
kvRDD1.values.collect()

4.1 filter

kvRDD1.filter{ case (k,v) => k<2}.collect()

4.2 mapValues

kvRDD1.mapValues(_ * 2).collect()
kvRDD1.mapValues(x => x * 2).collect()

4.3 sortByKey

kvRDD1.sortByKey().collect()
kvRDD1.sortByKey(true).collect()
kvRDD1.sortByKey(false).collect()

4.4 reduceByKey

kvRDD1.reduceByKey((x,y) =>{x+y } ).collect()
kvRDD1.reduceByKey(_+_).collect()

五、多个RDD k-v'转换'运算

val kvRDD2 =sc.parallelize(List( (1,6),(2,5),(2,7)))
val kvRDD3 =sc.parallelize(List((1,2),(1,3)))

5.1 join

kvRDD2.join(kvRDD3).collect()
kvRDD2.join(kvRDD3).foreach(println)

5.2 leftOuterJoin

kvRDD2.leftOuterJoin(kvRDD3).collect()

5.3 rightOuterJoin

kvRDD2.rightOuterJoin(kvRDD3).collect()

5.4 subtract kvRDD1删除存在kvRDD3的key

kvRDD1.subtract(kvRDD3).collect()

六、k-v 动作 运算

kvRDD1.first
kvRDD1.first._1
kvRDD1.first._2

kvRDD1.countByKey()
kvRDD1.take
kvRDD1.max
kvRDD1.min

var kv =kvRDD1.collectAsMap
kv(1)

根据key查值

kvRDD1.lookup(2)

七、广播变量

7.1 使用广播变量 Broadcast

val kvf =sc.parallelize(List((1,"apple"),(2,"orange")))
val kvmap =kvf.collectAsMap()
val fids = sc.parallelize(List(2,1))
val fnames =fids.map( x => kvmap(x)).collect()

==>

val bcmap = sc.broadcast(kvmap)                       //广播变量
val bcfids = sc.parallelize(List(2,1))
val bcnames = fids.map( x => bcmap.value(x)  ).collect()  //

7.2 累加器 accumulator

val total =sc.accumulator(0.0)
 intRDD.foreach(i => { total +=i   })

八、单词统计

sc.textFile("file:/usr/local/src/draw.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()
上一篇下一篇

猜你喜欢

热点阅读