sparkStreaming拉取kafka010的bug
2018-12-14 本文已影响0人
码仙丶
- 目前kafka版本中,很多公司在用kafka010,但是在用sparkstreaming去消费kafka时,很多人都会碰到一个错误
Caused by: java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-abcd1 test1 8 1 after polling for 512
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
- 解决步骤
- 根据报错信息
kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
- 可以看出是在这里跑出的错误,进入
CachedKafkaConsumer.scala
类,找到74行
if (!buffer.hasNext()) { poll(timeout) }
assert(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
- 继续进入poll(timeout)方法
#如果经过timeout的时间都没有拉取到数据,那么poll会返回一个空的消息集合,这里是kafka consumer的设计
private def poll(timeout: Long): Unit = {
val p = consumer.poll(timeout)
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()} ${r.size}")
buffer = r.iterator
}
- 可以看出这里是用poll去拉取kafka数据放到buffer
#这是buffer的声明
protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
- 在回到
#这里判断buffer是否有数据,如果没有数据就去poll,但是当poll也没有拉到数据
if (!buffer.hasNext()) { poll(timeout) }
#这里spark断言buffer里肯定有数据,如果没有就会抛出上面那个错误
assert(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
- 其实解决方法很简单
KafkaRDD.scala
#这里spark设置的默认poll的timeout是512毫秒,如果512毫秒后没拉到数据就会抛异常
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
- 只需将这个参数在session中设置大于512就可以了,具体设置多少要根据实际情况看,例如:
sparkConf.set("spark.streaming.kafka.consumer.poll.ms", 10000)
在kafka consumer API中如果poll(timeout)没有拉到数据会返回空的消息集合
但是spark在这里抛了异常