实时任务Peon进程创建segment

2019-12-14  本文已影响0人  sydt2011

创建Segment过程概述

Peon进程由middle manager进程启动
提供http接口接收原始数据
将一行行的数据做merge,当行数达到maxRowsInMemory(当前集群配置是50000)或者intermediatePersistPeriod(10分钟),将内存中的数据序列化成Segment文件,持久化到本地磁盘
达到时间窗口后,停止接收数据,并将上步创建的segment合并成一个大的segment
新创建的segment通过hdfs移交到历史节点

启动Peon进程

middleManager接收到overlord分配过来的任务后,创建线程,设置jvm的命令并执行,包括classpath,堆内堆外内存设置,druid端口(用与peon进程对外的查询接口,这里分配的端口导致有端口被占用的bug)
任务json配置等准备工作。
执行jvm命令后,当前线程将一直等待直到任务完成。
Peon进程和导入相关的启动过程是:

@LifecycleStart
public void start() throws InterruptedException
{
  final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile");
  final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile");
  final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream");
 
  try {
    task = jsonMapper.readValue(taskFile, Task.class);
 
    log.info(
        "Running with task: %s",
        jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task)
    );
  }
  catch (IOException e) {
    throw Throwables.propagate(e);
  }
 
  // Avoid running the same task twice on the same machine by locking the task base directory.
 
  final File taskLockFile = taskConfig.getTaskLockFile(task.getId());
 
  try {
    synchronized (this) {
      if (taskLockChannel == null && taskLockFileLock == null) {
        taskLockChannel = FileChannel.open(
            taskLockFile.toPath(),
            StandardOpenOption.CREATE,
            StandardOpenOption.WRITE
        );
 
        log.info("Attempting to lock file[%s].", taskLockFile);
        final long startLocking = System.currentTimeMillis();
        final long timeout = DateTimes.utc(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis();
        while (taskLockFileLock == null && System.currentTimeMillis() < timeout) {
          taskLockFileLock = taskLockChannel.tryLock();
          if (taskLockFileLock == null) {
            Thread.sleep(100);
          }
        }
 
        if (taskLockFileLock == null) {
          throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking);
        } else {
          log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
        }
      } else {
        throw new ISE("Already started!");
      }
    }
  }
  catch (IOException e) {
    throw Throwables.propagate(e);
  }
 
  if (taskExecutorConfig.isParentStreamDefined()) {
    // Spawn monitor thread to keep a watch on parent's stdin
    // If stdin reaches eof, the parent is gone, and we should shut down
    parentMonitorExec.submit(
        new Runnable()
        {
          @Override
          public void run()
          {
            try {
              while (parentStream.read() != -1) {
                // Toss the byte
              }
            }
            catch (Exception e) {
              log.error(e, "Failed to read from stdin");
            }
 
            // Kind of gross, but best way to kill the JVM as far as I know
            log.info("Triggering JVM shutdown.");
            System.exit(2);
          }
        }
    );
  }
 
  // Won't hurt in remote mode, and is required for setting up locks in local mode:
  try {
    if (!task.isReady(taskActionClientFactory.create(task))) {
      throw new ISE("Task[%s] is not ready to run yet!", task.getId());
    }
  }
  catch (Exception e) {
    throw new ISE(e, "Failed to run task[%s] isReady", task.getId());
  }
 
  statusFuture = Futures.transform(
      taskRunner.run(task),
      new Function<TaskStatus, TaskStatus>()
      {
        @Override
        public TaskStatus apply(TaskStatus taskStatus)
        {
          try {
            log.info(
                "Task completed with status: %s",
                jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
            );
 
            final File statusFileParent = statusFile.getParentFile();
            if (statusFileParent != null) {
              statusFileParent.mkdirs();
            }
            jsonMapper.writeValue(statusFile, taskStatus);
 
            return taskStatus;
          }
          catch (Exception e) {
            throw Throwables.propagate(e);
          }
        }
      }
  );
}

ThreadPoolTaskRunner

runner.png

数据导入

Peon进程启动后,在RealtimeIndexTask.run()方法中,完成任务的执行。

启动任务进程

导入数据到OnheapIncrementalIndex

EventReceiverFirehose提供http接口/push-events,接收tranquitiy提交的批量数据
EventReceiverFirehose 对数据处理的逻辑是:
对接收的数据进行解析,解析异常直接报错失败,对解析后的数据行接入到阻塞队列。
队列大小默认10万,如果任务线程队列消费数据不及时,接口会阻塞。

POST
@Path("/push-events")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
    InputStream in,
    @Context final HttpServletRequest req
)
{
  ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
 
  Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
  if (producerSequenceResponse.isPresent()) {
    return producerSequenceResponse.get();
  }
 
  CountingInputStream countingInputStream = new CountingInputStream(in);
  Collection<Map<String, Object>> events = null;
  try {
    events = objectMapper.readValue(
        countingInputStream, new TypeReference<Collection<Map<String, Object>>>()
        {
        }
    );
  }
  catch (IOException e) {
    return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
  }
  finally {
    bytesReceived.addAndGet(countingInputStream.getCount());
  }
  log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
 
  final List<InputRow> rows = Lists.newArrayList();
  for (final Map<String, Object> event : events) {
    // Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
    rows.addAll(parser.parseBatch(event));
  }
 
  try {
    addRows(rows);
    return Response.ok(
        objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
        contentType
    ).build();
  }
  catch (InterruptedException e) {
    ...;
  }
}
  
public void addRows(Iterable<InputRow> rows) throws InterruptedException
{
  for (final InputRow row : rows) {
    boolean added = false;
    while (!closed && !added) {
      added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
      if (!added) {
        long currTime = System.currentTimeMillis();
        long lastTime = lastBufferAddFailMsgTime.get();
        if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
          log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
        }
      }
    }
 
    if (!added) {
      throw new IllegalStateException("Cannot add events to closed firehose!");
    }
  }
}

RealtimeIndexTask持有上部的firehose的实例,消费缓存对垒,交给realtimePlumber处理:“
1 根据数据行的时间戳获取sink,每个interval对应一个sink,一般一个peon进程就一个sink;
sink中有多个Firehydrant,有一个负责响应查询到增量导入数据,其余只是负责查询
2.添加数据行到sink的房前firehydrant,由Firehydrant的onheapIndecrementalindex完成增量索引创建
3.判读当前的firehydrant中已有的数据行数,如果达到配置maxRowsinmemory,或者处理时间超过配置的intermediatePersistperiod,把当前firehydrant数据持久化到磁盘

public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
  long messageTimestamp = row.getTimestampFromEpoch();
  final Sink sink = getSink(messageTimestamp);
  metrics.reportMessageMaxTimestamp(messageTimestamp);
  if (sink == null) {
    return -1;
  }
 
  final IncrementalIndexAddResult addResult = sink.add(row, false);
  if (config.isReportParseExceptions() && addResult.getParseException() != null) {
    throw addResult.getParseException();
  }
 
  if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
    persist(committerSupplier.get());
  }
 
  return addResult.getRowCount();
}

一个sink包含了最终生成的segment数据,一个segment数据比较大,不合适都放在内存中,在创建segment中,会达到一定数据后持久化到磁盘,会有小小的segment的问题件,有一个个的Firehydrant表示,已经持久化到磁盘的数据对应的Firehydrant对象只负责响应查询,Sink中还有当前Firehydrant,既响应查询也不断将数据加入到Firehydrant对象中。
如果当前数据行时间字所在的interval没有对应的sink,就会创建新的sink对象。创建Sink的过程中,就会创建当前的FireHydrant对象

private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
  final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
      .withMinTimestamp(minTimestamp)
      .withTimestampSpec(schema.getParser())
      .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
      .withDimensionsSpec(schema.getParser())
      .withMetrics(schema.getAggregators())
      .withRollup(schema.getGranularitySpec().isRollup())
      .build();
  //创建OnheapIncrementalIndex对象
  final IncrementalIndex newIndex = new IncrementalIndex.Builder()
      .setIndexSchema(indexSchema)
      .setReportParseExceptions(reportParseExceptions)
      .setMaxRowCount(maxRowsInMemory)
      .buildOnheap();
 
  final FireHydrant old;
  synchronized (hydrantLock) {
    if (writable) {
      old = currHydrant;
      int newCount = 0;
      int numHydrants = hydrants.size();
      if (numHydrants > 0) {
        FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
        newCount = lastHydrant.getCount() + 1;
        if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) {
          Map<String, ColumnCapabilitiesImpl> oldCapabilities;
          if (lastHydrant.hasSwapped()) {
            oldCapabilities = Maps.newHashMap();
            ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment();
            try {
              QueryableIndex oldIndex = segment.asQueryableIndex();
              for (String dim : oldIndex.getAvailableDimensions()) {
                dimOrder.add(dim);
                oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities());
              }
            }
            finally {
              segment.decrement();
            }
          } else {
            IncrementalIndex oldIndex = lastHydrant.getIndex();
            dimOrder.addAll(oldIndex.getDimensionOrder());
            oldCapabilities = oldIndex.getColumnCapabilities();
          }
          newIndex.loadDimensionIterable(dimOrder, oldCapabilities);
        }
      }
      currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
      if (old != null) {
        numRowsExcludingCurrIndex.addAndGet(old.getIndex().size());
      }
      //新创建的FireHydrant加入到Sink
      hydrants.add(currHydrant);
    } else {
      // Oops, someone called finishWriting while we were making this new index.
      newIndex.close();
      throw new ISE("finishWriting() called during swap");
    }
  }
 
  return old;
}

数据行的添加最终是在OnheapIncrementalIndex对象的addToFacts方法完成:
OnheapIncrementalIndex实现可以理解成有一个Map对象以维度列和时间列TimeAndDims作为key,指标列作为value,当新的数据行加入时,通过key(TimeAndDims)确认对应的Aggregator聚合器对象,
维度值出现null值,作为一个单独的值加到维度字典编码中
Aggregator聚合器对象完成对数据行的累加操作
聚合时,使用ColumnSelectorFactory获取每行的指标值,和查询时通过游标获取列值不同,这里通过threadlocal方式获取,每次聚合前IncrementalIndex将数据InputRow放入到threadlocal
猜测这样实现而不是聚合时直接传入指标值的方式,主要是因为指标值聚合前需要类型转换和值的各种转换,这块逻辑主要在ColumnSelector完成,
这样,IncrementalIndex只负责将数据添加到threadlocal,Aggregator只需要从ColumnSelector获取要聚合的数据RollupFactsHolder(跳表)记录了维度值->行号,aggregators记录了行号->指标值数组的映射

protected AddToFactsResult addToFacts(
    AggregatorFactory[] metrics,
    boolean deserializeComplexMetrics,
    boolean reportParseExceptions,
    InputRow row,
    AtomicInteger numEntries,
    TimeAndDims key,
    ThreadLocal<InputRow> rowContainer,
    Supplier<InputRow> rowSupplier,
    boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException
{
  List<String> parseExceptionMessages;
  final int priorIndex = facts.getPriorIndex(key);
 
  Aggregator[] aggs;
 
  if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
    aggs = concurrentGet(priorIndex);
    parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
  } else {
    aggs = new Aggregator[metrics.length];
    factorizeAggs(metrics, aggs, rowContainer, row);
    parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
 
    final int rowIndex = indexIncrement.getAndIncrement();
    concurrentSet(rowIndex, aggs);
 
    // Last ditch sanity checks
    if (numEntries.get() >= maxRowCount
        && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX
        && !skipMaxRowsInMemoryCheck) {
      throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
    }
    final int prev = facts.putIfAbsent(key, rowIndex);
    if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
      numEntries.incrementAndGet();
    } else {
      // We lost a race
      aggs = concurrentGet(prev);
      parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
      // Free up the misfire
      concurrentRemove(rowIndex);
      // This is expected to occur ~80% of the time in the worst scenarios
    }
  }
 
  return new AddToFactsResult(numEntries.get(), parseExceptionMessages);
}

前FireHydrant中已有的数据行数如果达到配置的maxRowsInMemory,或者处理时间超过配置的intermediatePersistPeriod,将把当前FireHydrant数据持久化到磁盘

任务移交

任务移交.png
上一篇下一篇

猜你喜欢

热点阅读