数客联盟

Spark Streaming Kafka Offset 一致性

2019-03-21  本文已影响9人  raindaywhu

在我们使用Spark Streaming 处理消息队列中的数据时,一般都需要关注数据消费的一致性,即当流处理程序重启后,从消息队列中数据消费的位置应当也应当是上次消费的位置。
Spark Streaming官方文档中告诉我们支持exactly once语义,但依赖于上下游的持久化存储。关于SparkStreaming支持的一致性模型与容错可以参见:https://www.jianshu.com/p/cacb1e922c38
如果消息队列中如果选择了Kafka,很不幸,SparkStreaming-Kafka 目前只能支持at-least-once 语义。下文中简述了SparkStreamingKafka中保存Offset的问题。
从kafka 0.9 开始Kafka 的offset不在保存到zookeeper中,而是kafka的一个特殊的topic __consumer_offsets中。

1 kafka auto.commit

kafka在从这个版本也提供了自动commit的功能,默认enable.auto.commit开启,针对于不同的业务语义,不能容忍数据丢失的场景,应当关闭auto commit, 它在提交offset的时机是周期性的,由auto.commit.interval.ms控制,存在业务操作还未完成,offset就被commit的可能,即存在数据丢失的风险。在Kafka consumer的官方API中有相关说明:

For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

2 SparkStreaming Kafka中提交offset API:

kafkaStream.foreachRDD(rdd => {
  val offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetArray, CommitCallback)
}

注意kafkaStream应当是createDirectStream返回的DirectKafkaInputDStream,其他类型的Stream并不支持Offset的提交。

跟进commitAsync的实现我们可以发现,DirectKafkaInputDStream内部维护了一个offset的提交队列,commitAsync会将offset提交到队列中,在compute方法中将队列中的offset进行commit。也就是说只有当下个批次时offsets才会被提交到kafka中。
相关代码位于org.apache.spark.streaming.kafka010中DirectKafkaInputDStream.scala文件
DirectKafkaInputDStream进行commit offset的过程如下:

 /**
   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
   * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
   * @param callback Only the most recently provided callback will be used at commit.
   */
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
    commitCallback.set(callback)
    commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
  }

  protected def commitAll(): Unit = {
    val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
    var osr = commitQueue.poll()
    while (null != osr) {
      val tp = osr.topicPartition
      val x = m.get(tp)
      val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
      m.put(tp, new OffsetAndMetadata(offset))
      osr = commitQueue.poll()
    }
    if (!m.isEmpty) {
      consumer.commitAsync(m, commitCallback.get)
    }
  }

DirectKafkaInputDStream进行compute过程,先计算RDD,最后调用commitAll

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
      true)
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
      getPreferredHosts, useConsumerCache)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }

3 SparkStreamingKafka 拉取kafka数据的过程

到这里,我们可能会对SparkStreaming 中提交Kafka Offset的机制很不满,即使我们在自己的业务逻辑之后commit offsets, 实际offset的提交也是在下个批次完成。这里我们能否自己创建consumer完成offset的提交呢?我们需要看下SparkStreamingKafka中是如何完成每个批次中kafka数据的消费与KafkaRDD创建的。
SparkStreaming Kafka 在实现KafkaRDD过程如下:
1 driver端会一直持有一个创建好的consumer,在每个批次拉取数据时,调用Kafka的API poll操作。
2 将此时该consumer得到的(partition,offset)消息进行划分spark partition,每个kafka partition都会作为一个spark partition。
3 最终每个task独立去消费一个partition。executor中创建consumer,consumer通过kafka api 的assign方法拉取数据。

DirectKafkaInputDStream 中读取topic offsets的相关代码:

private def paranoidPoll(c: Consumer[K, V]): Unit = {
    val msgs = c.poll(0)
    if (!msgs.isEmpty) {
      // position should be minimum offset per topicpartition
      msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
        val tp = new TopicPartition(m.topic, m.partition)
        val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
        acc + (tp -> off)
      }.foreach { case (tp, off) =>
          logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
          c.seek(tp, off)
      }
    }
  }
/**
   * Returns the latest (highest) available offsets, taking new partitions into account.
   */
  protected def latestOffsets(): Map[TopicPartition, Long] = {
    val c = consumer
    paranoidPoll(c)
    val parts = c.assignment().asScala

    // make sure new partitions are reflected in currentOffsets
    val newPartitions = parts.diff(currentOffsets.keySet)
    // position for new partitions determined by auto.offset.reset if no commit
    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
    // don't want to consume messages, so pause
    c.pause(newPartitions.asJava)
    // find latest available offsets
    c.seekToEnd(currentOffsets.keySet.asJava)
    parts.map(tp => tp -> c.position(tp)).toMap
  }

kafkaRDD中每个task读取topic数据的过程:

/**
   * An iterator that fetches messages directly from Kafka for the offsets in partition.
   * Uses a cached consumer where possible to take advantage of prefetching
   */
  private class KafkaRDDIterator(
      part: KafkaRDDPartition,
      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {

    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

    context.addTaskCompletionListener{ context => closeIfNeeded() }

    val consumer = if (useConsumerCache) {
      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
      if (context.attemptNumber >= 1) {
        // just in case the prior attempt failures were cache related
        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
      }
      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    } else {
      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    }

    var requestOffset = part.fromOffset

    def closeIfNeeded(): Unit = {
      if (!useConsumerCache && consumer != null) {
        consumer.close
      }
    }

    override def hasNext(): Boolean = requestOffset < part.untilOffset

    override def next(): ConsumerRecord[K, V] = {
      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
      val r = consumer.get(requestOffset, pollTimeout)
      requestOffset += 1
      r
    }
  }

消费Kafka 数据时,Kafka有一个consumer group的控制,即对于一个topic,不同consumer设定了相同的group.id, 这些consumer如果调用Kafka api中subscribe的方式消费topic数据,则会在同一个group中,每个consumer会拿到该topic不同的partition的数据。

每当consumer group中的加入或者退出了consumer,亦或者topic的分区发生了变化,会触发rebalancing,即重新划分consumer group中每个consumer消费partition的范围。

而kafka api assign不会引发rebalancing,不会使得该consumer加入consumer group。poll 是kafka api中使consumer加入consumer group的方式,每个consumer 都需要定时的poll, 否则就会脱离consumer group中。

从这个过程中,我们可以看到offset的提交只能由driver端的consumer完成。
如果我们创建另外一个consumer,以subscribe的方式订阅topic,那么该consumer group会有两个consumer, SparkStreamingKafka中的consumer拿到的数据就不是全部。
而如果我们的consumer以assign的方式创建,进行commit offset的话,会得到错误.即有相同的groupID有一个活跃的消费组。

CommitFailedException -    
 if the commit failed and cannot be retried. This can only occur if you are using automatic group management with subscribe(Collection), or if there is an active group with the same groupId which is using group management.

更多关于kafka 消费、rebalancing、offset commit 参考Kafka API 中的说明。
https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

上一篇 下一篇

猜你喜欢

热点阅读