kakfa连接sparkstreaming时候offset保存问
2019-03-28 本文已影响0人
大数据修行
官网的scala代码
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}