(五)transform、Join的使用、DStream和RDD
2018-11-16 本文已影响0人
白面葫芦娃92
数据一:日志信息
domain.time.traffic
ruozedata.com
baidu.com
ruozedata.com
数据二:已有文件 黑名单
domain
baidu.com
需求:把数据二当做一个黑名单,也就是把数据一中的baidu.com数据剔除掉,只留下ruozedata.com
一、用SparkCore实现:(IDEA)
import scala.collection.mutable.ListBuffer
object LeftJoinApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("LeftJoinApp")
val sc = new SparkContext(conf)
// 数据一
val input1 = new ListBuffer[(String,Long)]
input1.append(("www.ruozedata.com",8888))
input1.append(("www.ruozedata.com",8889))
input1.append(("www.ruozedata.com",8887))
input1.append(("www.baidu.com",8886))
val data1 = sc.parallelize(input1)
// 数据二
val input2 = new ListBuffer[(String,Boolean)]
input2.append(("www.baidu.com",true))
val data2 = sc.parallelize(input2)
val res1 = data1.leftOuterJoin(data2)//.collect().foreach(println)
val res2 = res1.filter(_._2._2.getOrElse(false) != true)//.collect().foreach(println)
val res3 = res2.map(x=>(x._1,x._2._1))//.collect().foreach(println)
sc.stop()
}
}
输出结果:
res1:
(www.ruozedata.com,(8888,None))
(www.ruozedata.com,(8889,None))
(www.ruozedata.com,(8887,None))
(www.baidu.com,(8886,Some(true)))
----------------------------------------------------------------
res3:
(www.ruozedata.com,(8888,None))
(www.ruozedata.com,(8889,None))
(www.ruozedata.com,(8887,None))
----------------------------------------------------------------
res3:
(www.ruozedata.com,8888)
(www.ruozedata.com,8889)
(www.ruozedata.com,8887)
二、用SparkStreaming实现,其中最重要的是使用了transform算子(DStream和RDD做操作,一定要用transform)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object LeftJoinStreamingApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("LeftJoinStreamingApp")
val ssc = new StreamingContext(conf,Seconds(5))
// 数据二:RDD
val input2 = new ListBuffer[(String,Boolean)]
input2.append(("www.baidu.com",true))
val data2 = ssc.sparkContext.parallelize(input2)
// 数据一:nc
val lines = ssc.socketTextStream("hadoop000",9999)
lines.map(x=>{(x.split(",")(0),x)}).transform(rdd=> rdd.leftOuterJoin((data2))).filter(_._2._2.getOrElse(false) != true).map(_._2._1).print()
ssc.start()
ssc.awaitTermination()
}
}
nc输入数据一:
[hadoop@hadoop000 ~]$ nc -lk 9999
www.ruozedata.com,9999
www.baidu.com,4444
www.ruozedata.com,8888
www.baidu.com,3333
www.ruozedata.com,9999
www.ruozedata.com,7777
IDEA 输出结果:
-------------------------------------------
Time: 1538102140000 ms
-------------------------------------------
www.ruozedata.com,9999
www.ruozedata.com,8888
www.ruozedata.com,9999
www.ruozedata.com,7777
实现相同的功能,用广播变量的方式,因为广播出去之后匹配没有shuffle,性能更高
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object BroadcastStreamingApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("BroadcastStreamingApp")
val ssc = new StreamingContext(conf,Seconds(5))
// 数据二:RDD
val input2 = new ListBuffer[(String,Boolean)]
input2.append(("www.baidu.com",true))
val data2 = ssc.sparkContext.parallelize(input2).collectAsMap()
val broaddata2 = ssc.sparkContext.broadcast(data2)
val lines = ssc.socketTextStream("hadoop000",9999)
lines.map(x=>{(x.split(",")(0),x)})
.transform(rdd=> {
rdd.mapPartitions(partition => {
val map = broaddata2.value
for ((key, value) <- partition if !map.contains(key))
yield value
})
}).print()
ssc.start()
ssc.awaitTermination()
}
}