kafka.consumer.SimpleConsumer使用案
摘要:SimpleConsumer
,Zookeeper
,offset管理
,Spark Streaming
SimpleConsumer
SimpleConsumer
是一个接近底层的管理kafka topic,partition,offset元数据的API,可以拥有更大的权限读取kafka数据,SimpleConsumer的使用步骤:
- 遍历broker节点找到leader
- 使用leader的host,port发送请求获得offset
- 指定offset获取数据
代码案例:使用SimpleConsumer从broker中获取最新offset
和最早offset
,先找到leader副本,再用发送请求到leader broker获取offset,OffsetRequest.EarliestTime
,OffsetRequest.LatestTime
分别指定最小offset和最大offset。
import kafka.api.{OffsetRequest, PartitionMetadata, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.TopicPartition
def getTopicOffsets(topic: String, bootstrap: String, where: String): java.util.Map[TopicPartition, java.lang.Long] = {
val clientId = ""
var brokers = List[(String, Int)]()
var hostAry: Array[String] = bootstrap.split(",");
if (hostAry == null) {
hostAry = new Array[String](1)
hostAry(0) = bootstrap;
}
// 解析bootstrap为List[host, port]
for (host <- hostAry) {
val hostinfo: Array[String] = host.split(":")
if (hostinfo == null) {
if (host != null && !host.isEmpty()) {
brokers = brokers.+:((host, 9092))
}
} else {
if (hostinfo(0).length > 0 && hostinfo(1).length > 0) {
brokers = brokers.+:((hostinfo(0), Integer.parseInt(hostinfo(1))))
}
}
}
// 先找到leader broker, 在每个机器节点使用SimpleConsumer发送请求,得到每个分区的元数据
val metas = findLeader(brokers, topic) // Map("分区" -> 分区元数据)
val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
//遍历每个分区
metas.keys.foreach(f => {
val meta = metas(f)
meta.leader match { // Some(BrokerEndPoint(79,cloudera03,9092)))
case Some(leader) => {
var consumer: SimpleConsumer = null
try {
// 发送请求给每个分区的leader
consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, f)
var request: OffsetRequest = null
// 得到每个分区最早的offset和最新的offset
if (where.equals("earliest")) {
request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
} else if (where.equals("latest")) {
request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
}
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
ret.put(new TopicPartition(topic, f), new java.lang.Long(offsets.head)) // offsets.head 0 30
} catch {
case ex: Exception => {
ex.printStackTrace()
}
} finally {
consumer.close
}
}
case None => {
System.err.println(Thread.currentThread().getName + "[" + "Error: partition %d does not exist".format(f))
}
}
})
ret // {test_gp-0=0, test_gp-1=0}
}
找到leader broker
def findLeader(seedBrokers: List[(String, Int)], topic: String): mutable.Map[Int, PartitionMetadata] = {
val map: mutable.Map[Int, PartitionMetadata] = mutable.Map[Int, PartitionMetadata]()
// 遍历每个节点机器
for (seed <- seedBrokers) {
var consumer: SimpleConsumer = null;
try {
consumer = new SimpleConsumer(seed._1, seed._2, 100000, 64 * 1024,
"leaderLookup" + new java.util.Date().getTime());
val topics: Array[String] = Array[String](topic);
// 使用SimpleConsumer发送请求
val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0);
val resp: TopicMetadataResponse = consumer.send(req);
val metaData: Seq[TopicMetadata] = resp.topicsMetadata
// 遍历每个分区
for (item <- metaData) {
for (part <- item.partitionsMetadata) {
println("part.partitionID", part.partitionId)
println("part", part)
map += (part.partitionId -> part) // part.getClass kafka.api.PartitionMetadata
}
}
} catch {
case ex: Exception =>
System.out.println(Thread.currentThread().getName + "[" + "Error communicating with Broker [" + seed + "] to find Leader for [" + topic
+ ", ] Reason: " + ex);
} finally {
if (consumer != null)
consumer.close();
}
}
// println("最终的map", map) // Map("分区" -> 分区元数据)
// println("最终的map size", map.size) // 2
map
}
运行查看offset
def main(args: Array[String]): Unit = {
val res = getTopicOffsets("test_gp", "cloudera01:9092,cloudera02:9092,cloudera03:9092", "earliest")
println("最早offset", res) // (最早offset,{test_gp-0=0, test_gp-1=0})
val res2 = getTopicOffsets("test_gp", "cloudera01:9092,cloudera02:9092,cloudera03:9092", "latest")
println("最新offset", res2) // (最新offset,{test_gp-0=30, test_gp-1=30})
这个结果和程序内调用消息的offset打印出来一致
{"partition":"1","offset":"25","name":"tom","addr":"beijing#100000"}
{"partition":"1","offset":"26","name":"lulu","addr":"hangzhou#310000"}
{"partition":"1","offset":"24","name":"alice","addr":"shanghai#200000"}
{"partition":"1","offset":"29","phone":"18800074423","name":"lulu"}
{"partition":"1","offset":"27","phone":"17730079427","name":"alice"}
{"partition":"1","offset":"28","phone":"16700379451","name":"tom"}
{"partition":"0","offset":"26","name":"bob","addr":"shanghai#200000"}
{"partition":"0","offset":"24","name":"nick","addr":"shanghai#200000"}
{"partition":"0","offset":"28","phone":"18700079458","name":"amy"}
{"partition":"0","offset":"29","phone":"15700079421","name":"bob"}
{"partition":"0","offset":"25","name":"amy","addr":"beijing#100000"}
{"partition":"0","offset":"27","phone":"14400033426","name":"nick"}
读取Zookeeper存储的offset
手动维护offset,消费者将offset写入Zookeeper,当程序重启时直接从Zookeeper读取需要消费的最新offset,先读取Zookeeper节点值,对每一个分区
的offset节点值再用SimpleConsumer
发送请求获得leader broker端获取到的最早offset
和最新offset
进行二次判断同步
,如果Zookeeper中有越界非法记录
进行修正(比如broker端的offset已经过期
,但是zk没有及时同步,则修正到当前broker的最早offset,把这一批全部补起来),如果有漏记录和暂未写入
的节点,或者有新分区加入
,用broker端的最早offset进行补充
。流程如下:
import kafka.api.{OffsetRequest, PartitionMetadata, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import scala.collection.mutable
object GetTopicPartitionOffsets {
class MyZookeeper() extends Watcher {
val zk = new ZooKeeper("cloudera01,cloudera02,cloudera03", 20000, this)
@Override
def process(event: WatchedEvent) {
//no watching
}
}
def findLeader(seedBrokers: List[(String, Int)], topic: String): mutable.Map[Int, PartitionMetadata] = {
val map: mutable.Map[Int, PartitionMetadata] = mutable.Map[Int, PartitionMetadata]()
// 遍历每个节点机器
for (seed <- seedBrokers) {
var consumer: SimpleConsumer = null;
try {
consumer = new SimpleConsumer(seed._1, seed._2, 100000, 64 * 1024,
"leaderLookup" + new java.util.Date().getTime());
val topics: Array[String] = Array[String](topic);
// 使用SimpleConsumer发送请求
val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0);
val resp: TopicMetadataResponse = consumer.send(req);
val metaData: Seq[TopicMetadata] = resp.topicsMetadata
// 遍历每个分区
for (item <- metaData) {
for (part <- item.partitionsMetadata) {
println("part.partitionID", part.partitionId)
println("part", part)
map += (part.partitionId -> part) // part.getClass kafka.api.PartitionMetadata
}
}
} catch {
case ex: Exception =>
System.out.println(Thread.currentThread().getName + "[" + "Error communicating with Broker [" + seed + "] to find Leader for [" + topic
+ ", ] Reason: " + ex);
} finally {
if (consumer != null)
consumer.close();
}
}
// println("最终的map", map) // Map("分区" -> 分区元数据)
// println("最终的map size", map.size) // 2
map
}
/**
* 获得主题下各个分区的最早和最新offset
* @param topic
* @param bootstrap
* @param where
* @return
*/
def getTopicOffsets(topic: String, bootstrap: String, where: String): java.util.Map[TopicPartition, java.lang.Long] = {
val clientId = ""
var brokers = List[(String, Int)]()
var hostAry: Array[String] = bootstrap.split(",");
if (hostAry == null) {
hostAry = new Array[String](1)
hostAry(0) = bootstrap;
}
// 解析bootstrap为List[host, port]
for (host <- hostAry) {
val hostinfo: Array[String] = host.split(":")
if (hostinfo == null) {
if (host != null && !host.isEmpty()) {
brokers = brokers.+:((host, 9092))
}
} else {
if (hostinfo(0).length > 0 && hostinfo(1).length > 0) {
brokers = brokers.+:((hostinfo(0), Integer.parseInt(hostinfo(1))))
}
}
}
// 先找到leader broker, 在每个机器节点使用SimpleConsumer发送请求,得到每个分区的元数据
val metas = findLeader(brokers, topic) // Map("分区" -> 分区元数据)
val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
//遍历每个分区
metas.keys.foreach(f => {
val meta = metas(f)
meta.leader match { // Some(BrokerEndPoint(79,cloudera03,9092)))
case Some(leader) => {
var consumer: SimpleConsumer = null
try {
// 发送请求给每个分区的leader
consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, f)
var request: OffsetRequest = null
// 得到每个分区最早的offset和最新的offset
if (where.equals("earliest")) {
request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
} else if (where.equals("latest")) {
request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
}
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
ret.put(new TopicPartition(topic, f), new java.lang.Long(offsets.head)) // offsets.head 0 30
} catch {
case ex: Exception => {
ex.printStackTrace()
}
} finally {
consumer.close
}
}
case None => {
System.err.println(Thread.currentThread().getName + "[" + "Error: partition %d does not exist".format(f))
}
}
})
ret // {test_gp-0=0, test_gp-1=0}
}
def getPartitionOffset(zk: ZooKeeper, path: String): java.util.Map[TopicPartition, java.lang.Long] = {
// 如果path不存在直接返回空
if (zk.exists(path, false) == null) {
null
} else {
val children: java.util.List[java.lang.String] = zk.getChildren(path, false)
// 遍历zk上每个分区的offset值
if (children != null && children.size() > 0) {
val topic = path.split("/")(3) // /test/monitor/test_gp/kafka_operation_group
// 获取最早的offset
val earliestOffsets = getTopicOffsets(
topic,
"cloudera01:9092,cloudera02:9092,cloudera03:9092",
"earliest")
println("获取最早的offset: ", earliestOffsets) // {test_gp-0=0, test_gp-1=0})
// 获取最新的offset
val latestOffsets = getTopicOffsets(
topic,
"cloudera01:9092,cloudera02:9092,cloudera03:9092",
"latest")
println("获取最新的offset: ", latestOffsets) // {test_gp-0=30, test_gp-1=30}),已经是+1的结果
// zk的节点+1的结果为{test_gp-0=17, test_gp-1=18}
// 两者不一致,可能之前消费又漏写如zk的,理论上zk的值小于等于broker获得最新offset值
// 初始化一个Map存储所有分区的offset
val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
// 遍历主题的每一个分区
for (i <- 0 until children.size) {
val keyObj = new TopicPartition(topic, children.get(i).toInt) // keyObj test_gp-0
// 再往下一层节点,获取节点值+1为新的要拉取的offset
var offset = new java.lang.Long(getData(zk, path + "/" + children.get(i)).toLong + 1)
// 用从broker端获取的最新offset,最老offset最zk上记录的offset进行修正
if (offset.longValue() < earliestOffsets.get(keyObj).longValue()) {
offset = earliestOffsets.get(keyObj)
} else if (offset.longValue() > latestOffsets.get(keyObj).longValue()) {
offset = latestOffsets.get(keyObj)
}
ret.put(keyObj, offset) // 分区 -> offset
println("ret: ", ret)
//把存在的都删掉
earliestOffsets.remove(keyObj)
}
//把不存在的 再添加进去, 防止zk中有节点没有值
ret.putAll(earliestOffsets)
ret
} else {
null
}
}
}
def getData(zk: ZooKeeper, path: String): String = {
val stat = zk.exists(path, false)
if (stat == null) {
null
} else {
new String(zk.getData(path, false, stat))
}
}
def main(args: Array[String]): Unit = {
val res3 = getPartitionOffset(new MyZookeeper().zk, "/test/monitor/test_gp/kafka_operation_group")
println(res3) // {test_gp-0=17, test_gp-1=18}
}
}
输出当下需要消费的offset为{test_gp-0=17, test_gp-1=18}
, 查看Zookeeper目录下结果,程序输出结果为Zookeeper节点值+1.
[zk: localhost:2181(CONNECTED) 36] get /test/monitor/test_gp/kafka_operation_group/0
16
cZxid = 0x3d000cade3
ctime = Fri Oct 09 15:11:54 CST 2020
mZxid = 0x3d000cbd37
mtime = Fri Oct 09 17:44:58 CST 2020
pZxid = 0x3d000cade3
cversion = 0
dataVersion = 14
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 37] get /test/monitor/test_gp/kafka_operation_group/1
17
cZxid = 0x3d000caddc
ctime = Fri Oct 09 15:11:54 CST 2020
mZxid = 0x3d000cbd31
mtime = Fri Oct 09 17:44:58 CST 2020
pZxid = 0x3d000caddc
cversion = 0
dataVersion = 12
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0