

2020-10-10  本文已影响0人  xiaogp

摘要:SimpleConsumerZookeeperoffset管理Spark Streaming


SimpleConsumer是一个接近底层的管理kafka topic,partition,offset元数据的API,可以拥有更大的权限读取kafka数据,SimpleConsumer的使用步骤:

代码案例:使用SimpleConsumer从broker中获取最新offset最早offset,先找到leader副本,再用发送请求到leader broker获取offset,OffsetRequest.EarliestTimeOffsetRequest.LatestTime分别指定最小offset和最大offset。

import kafka.api.{OffsetRequest, PartitionMetadata, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.TopicPartition

def getTopicOffsets(topic: String, bootstrap: String, where: String): java.util.Map[TopicPartition, java.lang.Long] = {
    val clientId = ""
    var brokers = List[(String, Int)]()
    var hostAry: Array[String] = bootstrap.split(",");
    if (hostAry == null) {
      hostAry = new Array[String](1)
      hostAry(0) = bootstrap;

    // 解析bootstrap为List[host, port]
    for (host <- hostAry) {
      val hostinfo: Array[String] = host.split(":")
      if (hostinfo == null) {
        if (host != null && !host.isEmpty()) {
          brokers = brokers.+:((host, 9092))
      } else {
        if (hostinfo(0).length > 0 && hostinfo(1).length > 0) {
          brokers = brokers.+:((hostinfo(0), Integer.parseInt(hostinfo(1))))

    // 先找到leader broker, 在每个机器节点使用SimpleConsumer发送请求,得到每个分区的元数据
    val metas = findLeader(brokers, topic)  // Map("分区" -> 分区元数据)
    val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
    metas.keys.foreach(f => {
      val meta = metas(f)
      meta.leader match {  // Some(BrokerEndPoint(79,cloudera03,9092)))
        case Some(leader) => {
          var consumer: SimpleConsumer = null
          try {
            // 发送请求给每个分区的leader
            consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
            val topicAndPartition = TopicAndPartition(topic, f)
            var request: OffsetRequest = null
            // 得到每个分区最早的offset和最新的offset
            if (where.equals("earliest")) {
              request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
            } else if (where.equals("latest")) {
              request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
            ret.put(new TopicPartition(topic, f), new java.lang.Long(offsets.head))  // offsets.head 0  30
          } catch {
            case ex: Exception => {
          } finally {
        case None => {
          System.err.println(Thread.currentThread().getName + "[" + "Error: partition %d does not exist".format(f))
    ret  // {test_gp-0=0, test_gp-1=0}

找到leader broker

def findLeader(seedBrokers: List[(String, Int)], topic: String): mutable.Map[Int, PartitionMetadata] = {
    val map: mutable.Map[Int, PartitionMetadata] = mutable.Map[Int, PartitionMetadata]()
    // 遍历每个节点机器
    for (seed <- seedBrokers) {
      var consumer: SimpleConsumer = null;
      try {
        consumer = new SimpleConsumer(seed._1, seed._2, 100000, 64 * 1024,
          "leaderLookup" + new java.util.Date().getTime());
        val topics: Array[String] = Array[String](topic);
        // 使用SimpleConsumer发送请求
        val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0);
        val resp: TopicMetadataResponse = consumer.send(req);
        val metaData: Seq[TopicMetadata] = resp.topicsMetadata
        // 遍历每个分区
        for (item <- metaData) {
          for (part <- item.partitionsMetadata) {
            println("part.partitionID", part.partitionId)
            println("part", part)
            map += (part.partitionId -> part)  // part.getClass  kafka.api.PartitionMetadata
      } catch {
        case ex: Exception =>
          System.out.println(Thread.currentThread().getName + "[" + "Error communicating with Broker [" + seed + "] to find Leader for [" + topic
            + ", ] Reason: " + ex);
      } finally {
        if (consumer != null)
//    println("最终的map", map)  // Map("分区" -> 分区元数据)
//    println("最终的map size", map.size)  // 2


  def main(args: Array[String]): Unit = {
    val res = getTopicOffsets("test_gp", "cloudera01:9092,cloudera02:9092,cloudera03:9092", "earliest")
    println("最早offset", res)  // (最早offset,{test_gp-0=0, test_gp-1=0})
    val res2 = getTopicOffsets("test_gp", "cloudera01:9092,cloudera02:9092,cloudera03:9092", "latest")
    println("最新offset", res2)  // (最新offset,{test_gp-0=30, test_gp-1=30})




手动维护offset,消费者将offset写入Zookeeper,当程序重启时直接从Zookeeper读取需要消费的最新offset,先读取Zookeeper节点值,对每一个分区的offset节点值再用SimpleConsumer发送请求获得leader broker端获取到的最早offset最新offset进行二次判断同步,如果Zookeeper中有越界非法记录进行修正(比如broker端的offset已经过期,但是zk没有及时同步,则修正到当前broker的最早offset,把这一批全部补起来),如果有漏记录和暂未写入的节点,或者有新分区加入,用broker端的最早offset进行补充。流程如下:

读取Zookeeper offset流程.png
import kafka.api.{OffsetRequest, PartitionMetadata, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}

import scala.collection.mutable

object GetTopicPartitionOffsets {

  class MyZookeeper() extends Watcher {
    val zk = new ZooKeeper("cloudera01,cloudera02,cloudera03", 20000, this)

    def process(event: WatchedEvent) {
      //no watching

  def findLeader(seedBrokers: List[(String, Int)], topic: String): mutable.Map[Int, PartitionMetadata] = {
    val map: mutable.Map[Int, PartitionMetadata] = mutable.Map[Int, PartitionMetadata]()
    // 遍历每个节点机器
    for (seed <- seedBrokers) {
      var consumer: SimpleConsumer = null;
      try {
        consumer = new SimpleConsumer(seed._1, seed._2, 100000, 64 * 1024,
          "leaderLookup" + new java.util.Date().getTime());
        val topics: Array[String] = Array[String](topic);
        // 使用SimpleConsumer发送请求
        val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0);
        val resp: TopicMetadataResponse = consumer.send(req);
        val metaData: Seq[TopicMetadata] = resp.topicsMetadata
        // 遍历每个分区
        for (item <- metaData) {
          for (part <- item.partitionsMetadata) {
            println("part.partitionID", part.partitionId)
            println("part", part)
            map += (part.partitionId -> part)  // part.getClass  kafka.api.PartitionMetadata
      } catch {
        case ex: Exception =>
          System.out.println(Thread.currentThread().getName + "[" + "Error communicating with Broker [" + seed + "] to find Leader for [" + topic
            + ", ] Reason: " + ex);
      } finally {
        if (consumer != null)
//    println("最终的map", map)  // Map("分区" -> 分区元数据)
//    println("最终的map size", map.size)  // 2

   * 获得主题下各个分区的最早和最新offset
   * @param topic
   * @param bootstrap
   * @param where
   * @return
  def getTopicOffsets(topic: String, bootstrap: String, where: String): java.util.Map[TopicPartition, java.lang.Long] = {
    val clientId = ""
    var brokers = List[(String, Int)]()
    var hostAry: Array[String] = bootstrap.split(",");
    if (hostAry == null) {
      hostAry = new Array[String](1)
      hostAry(0) = bootstrap;

    // 解析bootstrap为List[host, port]
    for (host <- hostAry) {
      val hostinfo: Array[String] = host.split(":")
      if (hostinfo == null) {
        if (host != null && !host.isEmpty()) {
          brokers = brokers.+:((host, 9092))
      } else {
        if (hostinfo(0).length > 0 && hostinfo(1).length > 0) {
          brokers = brokers.+:((hostinfo(0), Integer.parseInt(hostinfo(1))))

    // 先找到leader broker, 在每个机器节点使用SimpleConsumer发送请求,得到每个分区的元数据
    val metas = findLeader(brokers, topic)  // Map("分区" -> 分区元数据)
    val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
    metas.keys.foreach(f => {
      val meta = metas(f)
      meta.leader match {  // Some(BrokerEndPoint(79,cloudera03,9092)))
        case Some(leader) => {
          var consumer: SimpleConsumer = null
          try {
            // 发送请求给每个分区的leader
            consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
            val topicAndPartition = TopicAndPartition(topic, f)
            var request: OffsetRequest = null
            // 得到每个分区最早的offset和最新的offset
            if (where.equals("earliest")) {
              request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
            } else if (where.equals("latest")) {
              request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
            ret.put(new TopicPartition(topic, f), new java.lang.Long(offsets.head))  // offsets.head 0  30
          } catch {
            case ex: Exception => {
          } finally {
        case None => {
          System.err.println(Thread.currentThread().getName + "[" + "Error: partition %d does not exist".format(f))
    ret  // {test_gp-0=0, test_gp-1=0}

  def getPartitionOffset(zk: ZooKeeper, path: String): java.util.Map[TopicPartition, java.lang.Long] = {
    // 如果path不存在直接返回空
    if (zk.exists(path, false) == null) {
    } else {
      val children: java.util.List[java.lang.String] = zk.getChildren(path, false)
      // 遍历zk上每个分区的offset值
      if (children != null && children.size() > 0) {
        val topic = path.split("/")(3)  // /test/monitor/test_gp/kafka_operation_group
        // 获取最早的offset
        val earliestOffsets = getTopicOffsets(
        println("获取最早的offset: ", earliestOffsets)  // {test_gp-0=0, test_gp-1=0})

        // 获取最新的offset
        val latestOffsets = getTopicOffsets(
        println("获取最新的offset: ", latestOffsets)  // {test_gp-0=30, test_gp-1=30}),已经是+1的结果
        // zk的节点+1的结果为{test_gp-0=17, test_gp-1=18}
        // 两者不一致,可能之前消费又漏写如zk的,理论上zk的值小于等于broker获得最新offset值

        // 初始化一个Map存储所有分区的offset
        val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
        // 遍历主题的每一个分区
        for (i <- 0 until children.size) {
          val keyObj = new TopicPartition(topic, children.get(i).toInt)  // keyObj test_gp-0
          // 再往下一层节点,获取节点值+1为新的要拉取的offset
          var offset = new java.lang.Long(getData(zk, path + "/" + children.get(i)).toLong + 1)

          // 用从broker端获取的最新offset,最老offset最zk上记录的offset进行修正
          if (offset.longValue() < earliestOffsets.get(keyObj).longValue()) {
            offset = earliestOffsets.get(keyObj)
          } else if (offset.longValue() > latestOffsets.get(keyObj).longValue()) {
            offset = latestOffsets.get(keyObj)
          ret.put(keyObj, offset)  // 分区 -> offset
          println("ret: ", ret)

        //把不存在的 再添加进去, 防止zk中有节点没有值
      } else {

  def getData(zk: ZooKeeper, path: String): String = {
    val stat = zk.exists(path, false)
    if (stat == null) {
    } else {
      new String(zk.getData(path, false, stat))

  def main(args: Array[String]): Unit = {
    val res3 = getPartitionOffset(new MyZookeeper().zk, "/test/monitor/test_gp/kafka_operation_group")
    println(res3)  // {test_gp-0=17, test_gp-1=18}

输出当下需要消费的offset为{test_gp-0=17, test_gp-1=18}, 查看Zookeeper目录下结果,程序输出结果为Zookeeper节点值+1.

[zk: localhost:2181(CONNECTED) 36] get /test/monitor/test_gp/kafka_operation_group/0
cZxid = 0x3d000cade3
ctime = Fri Oct 09 15:11:54 CST 2020
mZxid = 0x3d000cbd37
mtime = Fri Oct 09 17:44:58 CST 2020
pZxid = 0x3d000cade3
cversion = 0
dataVersion = 14
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 37] get /test/monitor/test_gp/kafka_operation_group/1
cZxid = 0x3d000caddc
ctime = Fri Oct 09 15:11:54 CST 2020
mZxid = 0x3d000cbd31
mtime = Fri Oct 09 17:44:58 CST 2020
pZxid = 0x3d000caddc
cversion = 0
dataVersion = 12
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
上一篇 下一篇

