程序员大数据

Kafka源码分析-启动流程

2016-12-23  本文已影响1952人  扫帚的影子

Kafka启动入口类:kafk.Kafak

      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown //捕获control-c中断,停止当前服务
        }
      })

      kafkaServerStartable.startup //启动服务
      kafkaServerStartable.awaitShutdown //等待服务结束

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

Kafka启动代理类:KafkaServerStartable

def startup() {
    try {
      server.startup()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }

  def shutdown() {
    try {
      server.shutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Runtime.getRuntime.halt(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte) {
    server.brokerState.newState(newState)
  }

  def awaitShutdown() = 
    server.awaitShutdown

下一篇我们来开始介绍Kafka基础组件和辅助类库简介

Kafka源码分析-汇总
上一篇 下一篇

猜你喜欢

热点阅读