kakfa连接sparkstreaming时候offset保存问

2019-03-28  本文已影响0人  大数据修行

官网的scala代码

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
上一篇 下一篇

猜你喜欢

热点阅读