玩转大数据程序员spark

Kafka基础组件和辅助类库简介

2016-12-26  本文已影响1001人  扫帚的影子

Kafka基础组件概述

kafkaserver1.png

Kafka辅助类库简介

KafkaScheduler

/**
   * Initialize this scheduler so it is ready to accept scheduling of tasks
   */
  def startup()
  
  /**
   * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur. 
   * This includes tasks scheduled with a delayed execution.
   */
  def shutdown()
  
  /**
   * Check if the scheduler has been started
   */
  def isStarted: Boolean
  
  /**
   * Schedule a task
   * @param name The name of this task
   * @param delay The amount of time to wait before the first execution
   * @param period The period with which to execute the task. If < 0 the task will execute only once.
   * @param unit The unit for the preceding times.
   */
  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)

ZkUtils

  val ConsumersPath = "/consumers"
  val BrokerIdsPath = "/brokers/ids"
  val BrokerTopicsPath = "/brokers/topics"
  val ControllerPath = "/controller"
  val ControllerEpochPath = "/controller_epoch"
  val ReassignPartitionsPath = "/admin/reassign_partitions"
  val DeleteTopicsPath = "/admin/delete_topics"
  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
  val BrokerSequenceIdPath = "/brokers/seqid"
  val IsrChangeNotificationPath = "/isr_change_notification"
  val EntityConfigPath = "/config"
  val EntityConfigChangesPath = "/config/changes"

Pool

def getAndMaybePut(key: K) = {
    if (valueFactory.isEmpty)
      throw new KafkaException("Empty value factory in pool.")
    val curr = pool.get(key)
    if (curr == null) {
      createLock synchronized {
        val curr = pool.get(key)
        if (curr == null)
          pool.put(key, valueFactory.get(key))
        pool.get(key)
      }
    }
    else
      curr
  }

Logging

FileLock

ByteBounderBlockingQueue

def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean
def offer(e: E): Boolean
def put(e: E): Boolean
def poll(timeout: Long, unit: TimeUnit)
def poll()
def take(): E
...

DelayedItem

先写这么多吧,其他的遇到的时候再来分析,不得不感叹java的类库真是丰富啊~~~

# 下一篇我们来开始介绍Kafka的Request和Response

Kafka源码分析-汇总
上一篇下一篇

猜你喜欢

热点阅读