八 hugegraph 源代码 uploader

2019-07-21  本文已影响0人  NazgulSun

uploader 简介

线程池

uploader 使用 java concurrent 包的 executionservice

        this.batchService = ExecutorUtil.newFixedThreadPool(options.numThreads,
                                                            BATCH_WORKER);
        this.singleService = ExecutorUtil.newFixedThreadPool(options.numThreads,
                                                             SINGLE_WORKER);

使用java8 的CompetionFuture 来走异步编程,喜欢做回调并且喜欢异步风格的可以学习一下这个类。

        CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
            LOG.warn("Batch insert {} error, try single insert", type, e);
            this.submitInSingle(struct, batch);
            return null;
        }).whenComplete((r, e) -> this.batchSemaphore.release());

如何做的流量控制

   public <GE extends GraphElement> void submitBatch(ElementStruct struct,
                                                      List<GE> batch) {
        ElemType type = struct.type();
        try {
            *** this.batchSemaphore.acquire(); ***
        } catch (InterruptedException e) {
            throw new LoadException("Interrupted while waiting to submit %s " +
                                    "batch in batch mode", e, type);
        }

        InsertTask<GE> task = new BatchInsertTask<>(this.context, struct,
                                                    batch);
        CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
            LOG.warn("Batch insert {} error, try single insert", type, e);
            this.submitInSingle(struct, batch);
            return null;
***        }).whenComplete((r, e) -> this.batchSemaphore.release());  ***
    }

如何处理批处理的异常和监控

对于监控的话,hugegraph 使用了 apache的 StopWatch,这个工具类可以值得我们借鉴,以后再也不要用 long current = System.currentMIllians()
Stopwatch 更加面向对象。

上一篇 下一篇

猜你喜欢

热点阅读