tranquility 真绕

2020-04-22  本文已影响0人  专职掏大粪
Event 定义,指定 DataSource 的 schema
BeamFactory 定义,这个地方主要为了定义一些需要的信息。比如 :
    zk 地址,用来做服务发现
    dimension 指定
    Rollup 的聚合算子指定:count, sum, max, min 等,注意没有 avg
    segment 的时间粒度指定
    窗口大小指定
class SimpleEventBeamFactory extends BeamFactory[SimpleEvent]
{

  lazy val makeBeam: Beam[SimpleEvent] = {

    // Tranquility uses ZooKeeper (through Curator framework) for coordination.
    val curator = CuratorFrameworkFactory.newClient(
      "localhost:2181",
      new BoundedExponentialBackoffRetry(100, 3000, 5)
    )
    curator.start()

    val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons.
    val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path
    val dataSource = "foo"
    val dimensions = IndexedSeq("bar")
    val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
    val isRollup = true

    // Expects simpleEvent.timestamp to return a Joda DateTime object.
    DruidBeams
      .builder((simpleEvent: SimpleEvent) => simpleEvent.timestamp)
      .curator(curator)
      .discoveryPath(discoveryPath)
      .location(DruidLocation.create(indexService, dataSource))
      .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE, isRollup))
      .tuning(
        ClusteredBeamTuning(
          segmentGranularity = Granularity.HOUR,
          windowPeriod = new Period("PT10M"),
          partitions = 1,
          replicants = 1
        )
      )
      .buildBeam()
  }
}

自定义BeamFactory继承 BeamFactory,重写 makeBeam

DruidBeams.buildBeam 使用 ClusteredBeam

ClusteredBeamTuning 定义了beam 的参数

flink druid sink 算子初始化 Tranquilizer

val tranquilizer = Tranquilizer.create(beamFactory.makeBeam)

Tranquilizer.start()

@LifecycleStart
def start(): Unit = {
  started = true
  sendThread.start()
}

sendThread.start() 启动 Tranquilizer 线程

循环将数据发送到buffer 达到buffer阈值 触发

Tranquilizer.sendBuffer

private def sendBuffer(myIndex: Long, myBuffer: Buffer[MessageAndPromise]): Unit = {
    if (log.isDebugEnabled) {
      log.debug(s"Sending buffer with ${myBuffer.size} messages.")
    }

    val futureResults: Seq[Future[SendResult]] = try {
        //  实际上执行ClusteredBeam 的sendAll
      beam.sendAll(myBuffer.map(_.message))
    }
    
}

看一下ClusteredBeam.sendAll()

override def sendAll(events: Seq[EventType]): Seq[Future[SendResult]] = {
  val now = timekeeper.now.withZone(DateTimeZone.UTC)
  // Events, grouped and ordered by truncated timestamp, with their original indexes remembered
  //将数据按时间分组并排序
  val eventsWithPromises = Vector() ++ events.map(event => (event, Promise[SendResult]()))
  val grouped: Seq[(DateTime, IndexedSeq[(EventType, Promise[SendResult])])] = (eventsWithPromises groupBy {
    case (event, promise) =>
      tuning.segmentBucket(timestamper(event)).start
  }).toSeq.sortBy(_._1.getMillis)
  // Possibly warm up future beams
  def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = {
    if (dt <= end) {
      dt :: toBeWarmed(tuning.segmentBucket(dt).end, end)
    } else {
      Nil
    }
  }
  val latestEventTimestamp: Option[DateTime] = grouped.lastOption map { case (truncatedTimestamp, group) =>
    val event: EventType = group.maxBy(tuple => timestamper(tuple._1).getMillis)._1
    timestamper(event)
  }
  val warmingBeams: Future[Seq[Beam[EventType]]] = Future.collect(
    for (
      latest <- latestEventTimestamp.toList;
      tbwTimestamp <- toBeWarmed(latest, latest + tuning.warmingPeriod) if tbwTimestamp > latest
    ) yield {
      // Create beam asynchronously
      beam(tbwTimestamp, now)
    }
  )
  // Send data
  省略发送数据的代码
    }

将数据按时间分组并排序,取最大的时间 调用beam(tbwTimestamp, now)

用于在发送数据之前创建beam

private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = {
 
  val futureBeamOption = beams.get(timestamp.getMillis) match {
    case _ if !open => Future.value(None)
    case Some(x) if windowInterval.overlaps(bucket) => Future.value(Some(x))
    case Some(x) => Future.value(None)
    case None if timestamp <= localLatestCloseTime => Future.value(None)
    case None if !creationInterval.overlaps(bucket) => Future.value(None)
    case None =>
      // We may want to create new merged beam(s). Acquire the zk mutex and examine the situation.
      // This could be more efficient, but it's happening infrequently so it's probably not a big deal.
      data.modify {
        prev =>
          val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.getMillis, Nil)
          //判断现有的beam数据是否覆盖大于 定义的partitions数据,一个partition 对应一个beam
          if (prevBeamDicts.size >= tuning.partitions) {
            log.info(
              "Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)",
              identifier,
              timestamp,
              tuning.partitions,
              prevBeamDicts.size
            )
            prev
// 判断数据时间触发的新的task 时间是否大于上次任务结束的时间
          } else if (timestamp <= prev.latestCloseTime) {
            log.info(
              "Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam",
              prev.latestCloseTime,
              identifier,
              timestamp
            )
            prev
          } else {
// 时间大于上次任务结束的时间并且现有beam数没有覆盖到partion 创建beam
            assert(prevBeamDicts.size < tuning.partitions)
            assert(timestamp > prev.latestCloseTime)

            // We might want to cover multiple time segments in advance.
            val numSegmentsToCover = tuning.minSegmentsPerBeam +
              rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1)
            val intervalToCover = new Interval(
              timestamp.getMillis,
              tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).getMillis,
              ISOChronology.getInstanceUTC
            )
            val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start)

            // OK, create them where needed.
            val newInnerBeamDictsByPartition = new mutable.HashMap[Int, Dict]
            val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot {
              case (millis, beam) =>
                // Expire old beamDicts
                tuning.segmentGranularity.increment(new DateTime(millis)) + tuning.windowPeriod < now
            }) ++ (for (ts <- timestampsToCover) yield {
              val tsPrevDicts = prev.beamDictss.getOrElse(ts.getMillis, Nil)
              log.info(
                "Creating new merged beam for identifier[%s] timestamp[%s] (target = %d, actual = %d)",
                identifier,
                ts,
                tuning.partitions,
                tsPrevDicts.size
              )
              val tsNewDicts = tsPrevDicts ++ ((tsPrevDicts.size until tuning.partitions) map {
                partition =>
                  newInnerBeamDictsByPartition.getOrElseUpdate(
                    partition, {
                      // Create sub-beams and then immediately close them, just so we can get the dict representations.
                      // Close asynchronously, ignore return value.
                      beamMaker.newBeam(intervalToCover, partition).withFinally(_.close()) {
                        beam =>
                          val beamDict = beamMaker.toDict(beam)
                          log.info("Created beam: %s", objectMapper.writeValueAsString(beamDict))
                          beamDict
                      }
                    }
                  )
              })
              (ts.getMillis, tsNewDicts)
            })

beamMaker.newBeam 创建新的beam

override def newBeam(interval: Interval, partition: Int) = {
  require(
    beamTuning.segmentGranularity.widen(interval) == interval,
    "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
  )
  val baseFirehoseId = DruidBeamMaker.generateBaseFirehoseId(
    location.dataSource,
    beamTuning.segmentGranularity,
    interval.start,
    partition
  )
  val availabilityGroup = DruidBeamMaker.generateAvailabilityGroup(location.dataSource, interval.start, partition)
  val futureTasks = for (replicant <- 0 until beamTuning.replicants) yield {
    val firehoseId = "%s-%04d" format(baseFirehoseId, replicant)

    indexService.submit(taskBytes(interval, availabilityGroup, firehoseId, partition, replicant)) map {
      taskId =>
        TaskPointer(taskId, firehoseId)
    }
  }
  val tasks = Await.result(Future.collect(futureTasks))
  //DruidBeam 用于想mid的worker发送数据
  new DruidBeam(
    interval,
    partition,
    tasks,
    location,
    config,
    taskLocator,
    indexService,
    emitter,
    objectWriter
  )
}
indexService.submit(taskBytes(interval, availabilityGroup, firehoseId, partition, replicant)) map {
  taskId =>
    TaskPointer(taskId, firehoseId)
}

taskBytes 是任务的信息描述

indexService.submit 与overload通信并携带任务的信息描述到 druid/indexer/v1/task接口 创建task

DruidBeam.sendAll

override def sendAll(messages: Seq[A]): Seq[Future[SendResult]] = {
  val messagesWithPromises = Vector() ++ messages.map(message => (message, Promise[SendResult]()))

  // Messages grouped into chunks
  val messagesChunks: List[(Array[Byte], IndexedSeq[(A, Promise[SendResult])])] = messagesWithPromises
    .grouped(config.firehoseChunkSize)
    .map(xs => (objectWriter.batchAsBytes(xs.map(_._1)), xs))
    .toList

  for ((messagesChunkBytes, messagesChunk) <- messagesChunks) {
    // Try to send to all tasks, return "sent" if any of them accepted it.
    val taskResponses: Seq[Future[(TaskPointer, SendResult)]] = for {
      task <- tasks
      client <- clients.get(task) if client.active
    } yield {
      val messagePost = HttpPost(
        "/druid/worker/v1/chat/%s/push-events" format
          (location.environment.firehoseServicePattern format task.serviceKey)
      ) 
    }

    // Avoid become(chunkResult), for some reason it creates massive promise chains.
    chunkResult respond {
      case Return(result) => messagesChunk.foreach(_._2.setValue(result))
      case Throw(e) => messagesChunk.foreach(_._2.setException(e))
    }
  }

  messagesWithPromises.map(_._2)
}

/druid/worker/v1/chat/%s/push-events worker的数据接受服务

ClusteredBeam 的作用

1.用于调用beamMaker.newBeam 创建DruidBeam 发送数据

2.用于检测根据时间和分区及当前存在的beams 判断是否需要创建新的beam

3.DruidBeamMaker.newBeam 里面可以实现在每次创建新的beam 获取最新的schema的逻辑,用于动态schama

tranquility 维护每一个beam的ClusteredBeamMeta
默认会写到data目录

[zk: localhost:2181(CONNECTED) 2] get /druid/tranquility/beams/druid:tranquility:indexer/server_brand/

mutex   data

{
    "latestTime":"2020-04-23T03:00:00.000Z",
    "latestCloseTime":"2020-04-23T01:00:00.000Z",
    "beams":{
        "2020-04-23T02:00:00.000Z":[
            {
                "interval":"2020-04-23T02:00:00.000Z/2020-04-23T03:00:00.000Z",
                "partition":0,
                "tasks":Array[1],
                "timestamp":"2020-04-23T02:00:00.000Z"
            },
            {
                "interval":"2020-04-23T02:00:00.000Z/2020-04-23T03:00:00.000Z",
                "partition":1,
                "tasks":Array[1],
                "timestamp":"2020-04-23T02:00:00.000Z"
            }
        ],
        "2020-04-23T03:00:00.000Z":[
            {
                "interval":"2020-04-23T03:00:00.000Z/2020-04-23T04:00:00.000Z",
                "partition":0,
                "tasks":[
                    {
                        "id":"index_realtime_logdata_2020-04-23T03:00:00.000Z_0_0",
                        "firehoseId":"logdata-003-0000-0000"
                    }
                ],
                "timestamp":"2020-04-23T03:00:00.000Z"
            },
            {
                "interval":"2020-04-23T03:00:00.000Z/2020-04-23T04:00:00.000Z",
                "partition":1,
                "tasks":[
                    {
                        "id":"index_realtime_logdata_2020-04-23T03:00:00.000Z_1_0",
                        "firehoseId":"logdata-003-0001-0000"
                    }
                ],
                "timestamp":"2020-04-23T03:00:00.000Z"
            }
        ]
    }
}
上一篇 下一篇

猜你喜欢

热点阅读