Spark Streaming stream.repartiti

2018-01-17  本文已影响0人  pcqlegend

问题描述:

streaming 消费多个topic,但是不同topic的每个分区的数据量差距很大,一个数量级以上。导致每个task消费的数据量不一样,造成严重的数据倾斜。所以需要进行一次repartition使得处理起来比较均匀。

解决办法

但是就有了两种方式。两者使用的都是Direct方式而非Reciver方式。这两种方式有什么区别呢。看下伪代码

方法一

伪代码如下:
方法一

val stream = KafkaUtils.createStream// 三个topic 每个120个partition ,总共360partition
val streamToHandle = stream.repartition(128)
streamToHandle.foreachRDD(rdd =>{
  rdd.foreachPartition(partition =>{
       partition.foreach( item =>{ 
           //do some thing
        })
     })
    })
})

方法二

val stream = KafkaUtils.createStream// 三个topic 每个120个partition ,总共360partition
streamToHandle.foreachRDD(rdd =>{
 val rddToHandle = rdd.repartition(128)
  rdd.foreachPartition(partition =>{
       partition.foreach( item =>{ 
           //do some thing
        })
     })
    })
})

假设

在执行过程中stream.repartition 中执行的时候会接收到kafka的消息后直接进行repartition,
但是rdd.repartition其实是通过创建了RDD之后,如果你的job包含多个stage,并且不是在第一个stage中进行repartition,那么相当于进行了两次shuffle。但是前者却只在接收数据的时候直接进行shuffle,所以性能会高很多。
看下两者的DAG截图


image.png
image.png

看起来两者只有一次shuffle,和假设不一样,只不过后者的repartition是在foreachRDD 内部。

上一篇下一篇

猜你喜欢

热点阅读