Kafka Zookeeper 结构

2019-10-10  本文已影响0人  onedoc

Kafka 信息zk结构

image.png

启动server-0 --> zkcli --> ls /

[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, 
controller_epoch, consumers, latest_producer_id_block, config]

cluster

保存集群id和版本信息,broker 启动时从/cluster/id获取,如果没有broker生成

[zk: localhost:2181(CONNECTED) 14] get /cluster/id
{"version":"1","id":"cw-I6v4cSt2lsNio4v-rUQ"}

controller 集群临时节点

int (broker id of the controller) 存储center controller中央控制器所在kafka broker的信息
Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,当partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作。
当broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader。

Schema:
{
    "version": 版本编号默认为1,
    "brokerid": kafka集群中broker唯一编号,
    "timestamp": kafka broker中央控制器变更时的时间戳
}
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":0,"timestamp":"1559195868940"}
关闭brokerid 0
[zk: localhost:2181(CONNECTED) 9] get /controller
{"version":1,"brokerid":1,"timestamp":"1559232245731"}

brokers [ids, topics, seqid]

ids:/brokers/ids/[0...N] 
每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode
Schema:
{
    "jmx_port": jmx端口号,
    "timestamp": kafka broker初始启动时的时间戳,
    "host": 主机名或ip地址,
    "version": 版本编号默认为1,
    "port": kafka broker的服务端端口号,由server.properties中参数port确定
}
[zk: localhost:2181(CONNECTED) 16] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 17] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},
"endpoints":["PLAINTEXT://192.168.1.103:9092"],
"jmx_port":-1,"host":"192.168.1.103",
"timestamp":"1559232289265","port":9092,"version":4}
seqid:/brokers/seqid
broker启动时检查并确保存在, 永久节点
topics:/brokers/topics/[xxxx] 
kafka 集群中topic 信息
Schema:
{
    "version": "版本编号目前固定为数字1",
    "partitions": {
        "partitionId编号": [
            同步副本组brokerId列表
        ],
        "partitionId编号": [
            同步副本组brokerId列表
        ],
        .......
    }
}
[zk: localhost:2181(CONNECTED) 25] ls /brokers/topics
[test1]
[zk: localhost:2181(CONNECTED) 27] get /brokers/topics/test1
{"version":1,"partitions":{"2":[2,1],"1":[1,0],"3":[0,1],"0":[0,2]}}
/brokers/topics/[topic]/partitions/[0...N]/state 其中[0..N]表示partition索引号
Schema:
{
    "controller_epoch": 表示kafka集群中的中央控制器选举次数,
    "leader": 表示该partition选举leader的brokerId,
    "version": 版本编号默认为1,
    "leader_epoch": 该partition leader选举次数,
    "isr": [同步副本组brokerId列表]
}
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":14,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2]}
[zk: localhost:2181(CONNECTED) 11] get /controller_epoch
14
关闭broker 0 选举broker2位leader isr 只有broker2节点
[zk: localhost:2181(CONNECTED) 13] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":14,"leader":2,"version":1,"leader_epoch":1,"isr":[2]}
[zk: localhost:2181(CONNECTED) 20] get /controller_epoch
15
重新启动broker0, 集群epoch 为变化,  p0的 leader epoch 增长为2
[zk: localhost:2181(CONNECTED) 31] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":15,"leader":0,"version":1,"leader_epoch":2,"isr":[2,0]}
[zk: localhost:2181(CONNECTED) 20] get /controller_epoch
15

controller_epoch(永久节点,控制年代)

此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;

get /controller_epoch
7

isr_change_notification

在Kafka 中, Leader 和Follower 的数据同步遵循的是"最终一致"原则, 也就是数据同步会有延迟, 但保证最终数据的一致性.isr 是'in-sync' replicas 的缩写,代表的是与Leader 数据已经同步过的replica, 它会作为重选Leader 时作为判断依据,用来处理ISR集合变更的动作.

log_dir_event_notification

consumer注册信息

每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息

consumerId产生规则:
 
   StringconsumerUuid = null;
       if(config.consumerId!=null && config.consumerId){
           consumerUuid = consumerId;
       }else {
           String uuid = UUID.randomUUID()
           consumerUuid = "%s-%d-%s".format(
                InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
                uuid.getMostSignificantBits().toHexString.substring(0,8));
 
       }
  String consumerIdString = config.groupId + "_" + consumerUuid; 
 
Schema:
{
    "version": 版本编号默认为1,
    "subscription": { //订阅topic列表
        "topic名称": consumer中topic消费者线程数
    },
    "pattern": "static",
    "timestamp": "consumer启动时的时间戳"
}
  
Example:
{
    "version":1,
    "subscription":{
        "replicatedtopic":1
    },
    "pattern":"white_list",
    "timestamp":"1452134230082"
}

5.consumer owner

/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

当consumer启动时,所触发的操作:

a) 首先进行"Consumer Id注册";

b) 然后在"Consumer id 注册"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,

    都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

c) 在"Broker id 注册"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

consumer offset

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
用来跟踪每个consumer目前所消费的partition中最大的offset
此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失效,
重新触发balance,其他consumer可以继续消费.

latest_producer_id_block

broker启动时提前预分配一段PID,当前是0~999,即提前分配出1000个PID来

[zk: localhost:2181(CONNECTED) 32] get /latest_producer_id_block
{"version":1,"broker":1,"block_start":"8000","block_end":"8999"}

config

kafka配置信息

[zk: localhost:2181(CONNECTED) 0] ls /config
[changes, clients, brokers, topics, users]

/configs/topics 存放topic的定制化配置信息

上一篇下一篇

猜你喜欢

热点阅读