kafka Broker源码解析
Broker主要由SocketServer(Socket服务层),KafkaRequestHandlerPool(请求转发层),Kafka api(业务逻辑层),Control(集群状态控制层),KafkaHealthcheck Broker (Broker健康检测层),TopicConfigManager(topic配置信息监控层)组成。见下图:

M个处理线程 IO Thread从RequestChannel的请求阻塞队列requestQueue获取请求,调用kafkaApis处理不同的请求,M=num.io.threads
Broker共处理10种不同的request,分别为RequestKeys.ProduceKey、RequestKeys.FetchKey、RequestKeys.OffsetsKey、RequestKeys.MetadataKey 、RequestKeys.LeaderAndIsrKey、RequestKeys.StopReplicaKey、RequestKeys.UpdateMetadataKey、RequestKeys.ControlledShutdownKey、RequestKeys.OffsetCommitKey、RequestKeys.OffsetFetchKey。
IO Thread将request处理过的response存放进RequestChannel的响应阻塞队列responseQueues[i]
Processor Thread从对应的RequestChannel的响应阻塞队列responseQueues[i]获取之前自己发送的request,然后发送给客户端
KafkaHealthcheck(Broker 健康状态监测层)通过在ZK上注册EphemeralPath来实现

②TopicMetadataRequest: 生产者发送/消费者发送获取topic的元数据信息的请求
④OffsetRequest: 消费者发送获取某个topic的偏移量的请求
⑥OffsetFetchRequest: 消费者发送获取自己提交到KAFKA上的偏移量(如果是ZK上,则消费者自己获取)的请求
⑧StopReplicaRequest: 当broker停止时或者删除某个topic的分区的replica时,Controller发送通知相应的broker停止拷贝副本的请求
class SocketServer(val brokerId: Int,
val host: String,
val port: Int,
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val sendBufferSize: Int,
val recvBufferSize: Int,
val maxRequestSize: Int = Int.MaxValue,
val maxConnectionsPerIp: Int = Int.MaxValue,
val connectionsMaxIdleMs: Long,
val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)//
/* a meter to track the average free capacity of the network processors */
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
* Start the socket server
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求
processors(i) = new Processor(i,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
// register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections 接受网络连接请求
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
private[kafka] class Acceptor(val host: String,
val port: Int,
private val processors: Array[Processor],
val sendBufferSize: Int,
val recvBufferSize: Int,
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) {
val serverChannel = openServerSocket(host, port)
* Accept loop that checks for new connection attempts
def run() {
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
var currentProcessor = 0
while(isRunning) {
val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread 轮训
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
debug("Closing server socket and selector.")
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

private[kafka] class Processor(val id: Int,
val time: Time,
val maxRequestSize: Int,
val aggregateIdleMeter: Meter,
val idleMeter: Meter,
val totalProcessorThreads: Int,
val requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
override def run() {
while(isRunning) {
// setup any new connections that have been queued up
// register any new responses for writing
processNewResponses()//receive 对应阻塞队列responseQueue的response
val startSelectTime = SystemTime.nanoseconds
val ready = selector.select(300)
currentTimeNanos = SystemTime.nanoseconds
val idleTime = currentTimeNanos - startSelectTime
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
aggregateIdleMeter.mark(idleTime / totalProcessorThreads)
trace("Processor id " + id + " selection time = " + idleTime + " ns")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
else if(key.isWritable)
else if(!key.isValid)
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
} case e: InvalidRequestException => {
info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
} case e: Throwable => {
error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
debug("Closing selector.")
def read(key: SelectionKey) {
lruConnections.put(key, currentTimeNanos)
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
def write(key: SelectionKey) {
val socketChannel = channelFor(key)
val response = key.attachment().asInstanceOf[RequestChannel.Response]
val responseSend = response.responseSend//获取response的内容
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
if(responseSend.complete) {
trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
} else {
trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
KafkaRequestHandlerPool的逻辑比较简单,就是开启num.io.threads个KafkaRequestHandler,每个KafkaRequestHandler从RequestChannel. requestQueue 接受request,然后把对应的response存进responseQueues(i)队列
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {//创建num.io.threads个KafkaRequestHandler
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
def shutdown() {
info("shutting down")
for(handler <- runnables)
for(thread <- threads)
info("shut down completely")
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: Int,
val requestChannel: RequestChannel,
apis: KafkaApis) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue获取request
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)//调用负责业务逻辑的KafkaApis进行真正的处理,然后把response存放进对应的RequestChannel. responseQueues[i]
} catch {
case e: Throwable => error("Exception when handling request", e)
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
private val logs = new Pool[TopicAndPartition, Log]()
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {


通过跳跃表快速定位到位于哪个segment file
2)segment file中查找msg chunk
class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
* Find the slot in which the largest offset less than or equal to the given
* target offset is stored.
* @param idx The index buffer
* @param targetOffset The offset to look for
* @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
// we only store the difference from the base offset so calculate that
val relOffset = targetOffset - baseOffset
// check if the index is empty
if(entries == 0)
return -1
// check if the target offset is smaller than the least offset
if(relativeOffset(idx, 0) > relOffset)
return -1
// binary search for the entry
var lo = 0
var hi = entries-1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = relativeOffset(idx, mid)
if(found == relOffset)
return mid
else if(found < relOffset)
lo = mid
hi = mid - 1
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
recoveryPointCheckpoints记录了每个log最新的刷新的位置,即刷到磁盘的topic and partition的messages的偏移量,有可能在这之后的LogSegment和OffsetIndex刷新异常,需要特殊处理。
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
delay = InitialTaskDelayMs,
period = retentionCheckMs,
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
delay = InitialTaskDelayMs,
period = flushCheckMs,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,

AR(assignreplicas):分配副本 ISR(in-sync replicas):在同步中的副本,即下图:

Partition {
topic : string //topic名称
partition_id : int //partition id
leader : Replica // 这个分区的leader副本,是isr中的其中一个
ISR : Set[Replica] // 正在同步中的副本集合
AR : Set[Replica] // 这个分区的所有副本分配集合,一个broker上有至多一个分区副本
LeaderAndISRVersionInZK : long // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
Replica { // 一个分区副本信息
broker_id : int
partition : Partition //分区信息
log : Log //本地日志与副本关联信息
hw : long //最后被commit的message的offset信息
leo : long // 日志结尾offset
isLeader : Boolean //是否为该副本的leader
接下来来看ReplicaManager的主要作用,它的角色定位是负责接收controller的command以完成replica的管理工作,command主要有两种, LeaderAndISRCommand和StopReplicaCommand。因此主要完成三件事:
1)接受LeaderAndISRCommand命令 2)接受StopReplicaCommand命令 3)开启定时线程maybeShrinkIsr
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition))
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { // 检查requset epoch
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
val controllerId = leaderAndISRRequest.controllerId
val correlationId = leaderAndISRRequest.correlationId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
// 前面只是检查了request的epoch,但是还要检查其中的每个partitionStateInfo中的leader epoch
val partitionState = new HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
val partition = getOrCreatePartition(topic, partitionId)
val partitionLeaderEpoch = partition.getLeaderEpoch()
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
// local的partitionLeaderEpoch要小于request中的leaderEpoch,否则就是过时的request
if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {
// 判断该partition是否被assigned给当前的broker
// 只将被分配到当前broker的partition放入partitionState,其中partition是当前的状况,partitionStateInfo是request中最新的状况
partitionState.put(partition, partitionStateInfo)
else {
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
"epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
} else {
// Otherwise record the error code in response
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
"epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch))
responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
//case (partition, partitionStateInfo)中,partition是replicaManager当前的情况,而partitionStateInfo中间放的是request的新的分配情况,
val partitionsTobeLeader = partitionState
.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
// 如果是leader,则调用leader的流程
if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
// 如果是follower,则调用follower的流程
if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
if (!hwThreadInitialized) {
// 启动HighWaterMarksCheckPointThread,hw很重要,需要定期存到磁盘,这样failover的时候可以往后load
hwThreadInitialized = true
(responseMap, ErrorMapping.NoError)
private def makeLeaders(controllerId: Int, epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) = {
partitionState.foreach(state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-leader transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
// First stop fetchers for all the partitions
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
} catch {
case e: Throwable =>
partitionState.foreach { state =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
" epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch,
TopicAndPartition(state._1.topic, state._1.partitionId))
stateChangeLogger.error(errorMsg, e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-leader transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) {
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
var partitionsToMakeFollower: Set[Partition] = Set()
// TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
partitionState.foreach{ case (partition, partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {//只改变那些leader是available broker的partition
// Only change partition state when the leader is available
case Some(leaderBroker) =>
// 仅仅当partition的leader发生变化时才返回true,因为如果不变,不需要做任何操作
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
partitionsToMakeFollower += partition
stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
"controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
.format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
partition.topic, partition.partitionId, newLeaderBrokerId))
case None =>
// The leader broker should always be present in the leaderAndIsrRequest.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
" %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
.format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
partition.topic, partition.partitionId, newLeaderBrokerId))
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
//由于leader已发生变化,需要把旧的fetcher删除 ,因为它指向了旧的leader,从旧的leader fetch数据
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
"become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
partition.topic, partition.partitionId, correlationId, controllerId, epoch))
if (isShuttingDown.get()) { //真正shuttingDown,就不要再加fetcher
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
"controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
controllerId, epoch, partition.topic, partition.partitionId))
else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
"%d epoch %d with correlation id %d for partition [%s,%d]")
.format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
} catch {
case e: Throwable =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
"epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
stateChangeLogger.error(errorMsg, e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-follower transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
当broker stop或用户删除某replica时,KafkaServer会接受到StopReplicaRequest指令,此时会调用ReplicaManager的stopReplicas函数:
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
.format(localBrokerId, stopReplicaRequest.controllerEpoch) +
" Latest known controller epoch is %d " + controllerEpoch)
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
// 先通过FetcherManager停止相关partition的Fetcher线程
replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
for(topicAndPartition <- stopReplicaRequest.partitions){
// 然后针对不同的 topicAndPartition stop 副本
val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
responseMap.put(topicAndPartition, errorCode)
(responseMap, ErrorMapping.NoError)
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
deletePartition.toString, topic, partitionId))
val errorCode = ErrorMapping.NoError
getPartition(topic, partitionId) match {
case Some(partition) =>
if(deletePartition) { // 仅仅在deletePartition=true时,才会真正删除该partition
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null)
removedPartition.delete() // this will delete the local log
case None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
if(deletePartition) {
val topicAndPartition = TopicAndPartition(topic, partitionId)
if(logManager.getLog(topicAndPartition).isDefined) {
stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
.format(localBrokerId, deletePartition, topic, partitionId))
stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
.format(localBrokerId, deletePartition, topic, partitionId))
def startup() {
// start ISR expiration thread
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
// getOutOfSyncReplicas获取不在同步状态的副本
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
updateIsr(newInSyncReplicas) //把isr上传到zk
// we may need to increment high watermark since ISR could be down to 1
case None => // do nothing if no longer leader
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
* there are two cases that need to be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms,
* the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
* follower is not catching up and should be removed from the ISR
val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
// Case 1 above
// fetch的时候会更新logEndOffsetUpdateTimeMs
val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
if(stuckReplicas.size > 0)
debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
// 判断落后的messages数目
val slowReplicas = candidateReplicas.filter(r =>
r.logEndOffset.messageOffset >= 0 &&
leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
if(slowReplicas.size > 0)
debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:1)zookeeper,即把偏移量提交至zk上;2)kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定,默认为zookeeper。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为“__consumer_offsets”的log里面。
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
/* offsets and metadata cache */
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
scheduler.schedule(name = "offsets-cache-compactor",
fun = compact,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
1)当produceRequest.requiredAcks == 0时,即不需要ack,则立刻调用putOffsets更新偏移量
2)当produceRequest.requiredAcks == 1时,即需要立即返回response时,则立刻调用putOffsets更新偏移量
3)当produceRequest.requiredAcks == -1时,即只有此批消息达到最小副本数的时候,通过ProducerRequestPurgatory触发调用putOffsets更新偏移量 (ProducerRequestPurgatory之后的章节会讲)
private def compact() {
debug("Compacting offsets cache.")
val startMs = SystemTime.milliseconds
val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)
debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))
// delete the stale offsets from the table and generate tombstone messages to remove them from the log
val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>
val offsetsPartition = partitionFor(groupTopicAndPartition.group)
trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
(offsetsPartition, new Message(bytes = null, key = commitKey))
}.groupBy{ case (partition, tombstone) => partition }
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles.
val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) =>
val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
partitionOpt.map { partition =>
val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
val messages = tombstones.map(_._2).toSeq
trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
try {
partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
catch {
case t: Throwable =>
error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t)
// ignore and continue
debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))