Kafka

Kafka 源码解析之Broker请求处理流程

2019-08-11  本文已影响0人  专职掏大粪

kafka在设计上大量使用了Selector+Channel+Buffer的设计原理.所以在开始之前简单介绍一下NIO 的Selector+Channel+Buffer

NIO 的Selector+Channel+Buffer

Buffers(缓冲区)
Java NIO中的Buffer用于和NIO通道进行交互。
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存
标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

Channels(通道)
Java NIO的通道类似流,但又有些不同:
既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
通道可以异步地读写。

如下面图示是Buffers与Channel交互:


image.png

Selectors(选择器)

选择器用于监听多个通道的事件(比如:连接打开,数据到达)。Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接
下面是单线程中使用一个Selector处理3个Channel的图示:


image.png

Non-blocking IO(非阻塞IO)
当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。

Broker请求处理流程

下面通过重要环节的源码分析,来梳理请求处理的整个过程(kafka2.3)

  def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
      // 部分省略 ... 
      kafkaServerStartable.startup()
      kafkaServerStartable.awaitShutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Exiting Kafka due to fatal exception", e)
        Exit.exit(1)
    }
    Exit.exit(0)
  }

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
...
  def startup() {
    try server.startup()
    catch {
     ...
    }
  }
  }
 def startup(startupProcessors: Boolean = true) {
   this.synchronized {
     connectionQuotas = new ConnectionQuotas(config, time)
    //控制层面
     createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
    //数据层面
     createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
     if (startupProcessors) {
       //在控制层面启动Processor线程
       startControlPlaneProcessor()
      //在数据层面启动Processor线程
       startDataPlaneProcessors()
     } 
   }
 }

 private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                   endpoints: Seq[EndPoint]): Unit = synchronized {
   endpoints.foreach { endpoint =>
     connectionQuotas.addListener(config, endpoint.listenerName)
   //每一个endPoint创建一个Acceptor,创建多个Processor放入processor线程数组
     val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
     addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
   }
 }
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  private val processors = new ArrayBuffer[Processor]()

  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
   //将ServerChannel注册到Selector,并监听ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {

          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()

                if (key.isAcceptable) {
                  accept(key).foreach { socketChannel =>

                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      processor = synchronized {
             
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                    // 此处调用assignNewConnection
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
        // ...
        }
      }
    } 
  //...
  }

  private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
    //调用processor.accept
    if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
     // ...
      true
    } else
      false
  }
private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               failedAuthenticationDelayMs: Int,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider,
                               memoryPool: MemoryPool,
                               logContext: LogContext,
                               connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 维护一个新连接队列,在run方法里会取出处理
 private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  //每一个processor维护一个responseQueue
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

// processor都维护了一个单独的Selector
 private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache,
      time))
  // Visible to override for testing
  protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
    channelBuilder match {
      case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
      case _ =>
    }
    new KSelector(
      maxRequestSize,
      connectionsMaxIdleMs,
      failedAuthenticationDelayMs,
      metrics,
      time,
      "socket-server",
      metricTags,
      false,
      true,
      channelBuilder,
      memoryPool,
      logContext)
  }

  override def run() {
     //表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
    startupComplete() 
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          configureNewConnections()
          // register any new responses for writing
         //处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
          processNewResponses() 
          //调用KSelector.poll(),进行真正的数据读写
          poll()
          //调用Selector.mute,不再接受Read请求,发送响应之前,不可以再接收任何请求
          processCompletedReceives()
          processCompletedSends()
          processDisconnected()
          closeExcessConnections()
        } catch {
         // ...
        }
      }
    } finally {
      // ...
    }
  }
  }
 private def configureNewConnections() {
    var connectionsProcessed = 0
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
     // 取出新连接SocketChannel
      val channel = newConnections.poll()
      try {
     
       // 将SocketChannel注册到selector
        selector.register(connectionId(channel.socket), channel)
        connectionsProcessed += 1
      } catch {
       
        case e: Throwable =>
          // ...
      }
    }
  }

 public void register(String id, SocketChannel socketChannel) throws IOException {
        ensureNotRegistered(id);
        registerChannel(id, socketChannel, SelectionKey.OP_READ);
        this.sensors.connectionCreated.record();
    }
    class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
      import RequestChannel._
      val metrics = new RequestChannel.Metrics
      private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
      private val processors = new ConcurrentHashMap[Int, Processor]()
    
      /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
      def sendRequest(request: RequestChannel.Request) {
        requestQueue.put(request)
      }

    
      }

    }

  private def processCompletedReceives() {

    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
       
            else {
              val nowNanos = time.nanoseconds()
              if (channel.serverAuthenticationSessionExpired(nowNanos)) {
               // ...
              } else {
               
              //将请求通过RequestChannel.requestQueue交付给Handler
                requestChannel.sendRequest(req)
                selector.mute(connectionId)//不再接受Read请求,发送响应之前,不可以再接收任何请求
                handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
              }
            }
        
      } catch {
       // ...
      }
    }
  }

class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: AtomicInteger,
                          val requestChannel: RequestChannel,
                          apis: KafkaApis,
                          time: Time) extends Runnable with Logging {
  def run() {
    while (!stopped) {
    
      //从RequestChannel.requestQueue中取出请求
      val req = requestChannel.receiveRequest(300)
      req match {
        case RequestChannel.ShutdownRequest =>
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          try {
          // 调用KafkaApi.handle(),将请求交付给业务
            apis.handle(request)
          } catch {
          // 异常处理 ...
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)

  //初始化由KafkaRequestHandler线程构成的线程数组
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }
  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
// ...
}
  def handle(request: RequestChannel.Request) {
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
                //其它ApiKeys,略
               //异常处理略
      }
  
  }
 def handleProduceRequest(request: RequestChannel.Request) {
    val produceRequest = request.body[ProduceRequest]

        // 回调函数,内部将业务层处理的最终结果发送到对应processor负责的响应队列
    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
    
      // Send the response immediately. In case of throttling, the channel has already been muted.
      if (produceRequest.acks == 0) {
        // 通过RequestChannel将response放入processor的响应队列,调用requestChannel.sendResponse()将response交付给RequestChannel
        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
      }
    }
      // appendRecords方法是records写入的逻辑
      replicaManager.appendRecords(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = true,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)
    // ... 
    }
  }

一图胜千言,最后通过一张图来回顾整个Broker请求处理流程

整体流程图示如下:


image.png

参考自

https://blog.csdn.net/zhanyuanlin/article/details/76906583
http://ifeve.com/channels/

上一篇下一篇

猜你喜欢

热点阅读