sparkStreaming拉取kafka010的bug

2018-12-14  本文已影响0人  码仙丶
Caused by: java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-abcd1 test1 8 1 after polling for 512
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
  1. 根据报错信息
kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
if (!buffer.hasNext()) { poll(timeout) }
assert(buffer.hasNext(),
      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
#如果经过timeout的时间都没有拉取到数据,那么poll会返回一个空的消息集合,这里是kafka consumer的设计
private def poll(timeout: Long): Unit = {
    val p = consumer.poll(timeout)
    val r = p.records(topicPartition)
    logDebug(s"Polled ${p.partitions()}  ${r.size}")
    buffer = r.iterator
  }
#这是buffer的声明
protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
#这里判断buffer是否有数据,如果没有数据就去poll,但是当poll也没有拉到数据
if (!buffer.hasNext()) { poll(timeout) }
#这里spark断言buffer里肯定有数据,如果没有就会抛出上面那个错误
assert(buffer.hasNext(),
      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
#这里spark设置的默认poll的timeout是512毫秒,如果512毫秒后没拉到数据就会抛异常
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
sparkConf.set("spark.streaming.kafka.consumer.poll.ms", 10000)

在kafka consumer API中如果poll(timeout)没有拉到数据会返回空的消息集合
但是spark在这里抛了异常

上一篇下一篇

猜你喜欢

热点阅读