Elastic Search

ES 操作之批量写-BulkProcessor 原理浅析

2020-03-14  本文已影响0人  PCMD_casualZAO

最近对线上业务进行重构,涉及到ES同步这一块,在重构过程中,为了ES 写入 性能考虑,大量的采取了 bulk的方式,来保证整体的一个同步速率,针对BulkProcessor 来深入一下,了解下 是如何实现,基于请求数,请求数据量大小 和 固定时间,刷新写入ES 的原理

针对ES 批量写入, 提供了3种方式,在 high-rest-client 中
分别是 bulk bulkAsync bulkProcessor 3种方式。
本文主要针对 bulkProcessor 来进行一些讲述

BulkProcessor

文档介绍


BulkProcessor是一个线程安全的批量处理类,允许方便地设置 刷新 一个新的批量请求 
(基于数量的动作,根据大小,或时间),
容易控制并发批量的数量
请求允许并行执行。

创建流程

How To use ?

来看个demo 创建BulkProcessor

 @Bean(name = "bulkProcessor") // 可以封装为一个bean,非常方便其余地方来进行 写入 操作
  public BulkProcessor bulkProcessor(){

        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

        return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                    // todo do something
                int i = request.numberOfActions();
                log.error("ES 同步数量{}",i);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                        // todo do something
                Iterator<BulkItemResponse> iterator = response.iterator();
                while (iterator.hasNext()){
                    System.out.println(JSON.toJSONString(iterator.next()));
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                                // todo do something
                log.error("写入ES 重新消费");
            }
        }).setBulkActions(1000) //  达到刷新的条数
                .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) // 达到 刷新的大小
                .setFlushInterval(TimeValue.timeValueSeconds(5)) // 固定刷新的时间频率
                .setConcurrentRequests(1) //并发线程数
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) // 重试补偿策略
                .build(); 

    }

使用BulkProcessor

bulkProcessor.add(xxxRequest)

创建过程做了些什么?

  1. 创建一个consumer 对象用来封装传递参数,和请求操作

      BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
    

    我们可以看到用了java 8的函数式编程接口 BiConsumer 关于 BiConsumer 的用法,可以自行百度,因为也是采取的 异步刷新策略, 所以,是一个返回结果的Listener ActionListener<BulkResponse>

  2. 构建并BulkProcess

     return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {
             ****
             
            }).setBulkActions(1000)
                    .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB))
                    .setFlushInterval(TimeValue.timeValueSeconds(5))
                    .setConcurrentRequests(1)
                  .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
    
        }
    

    可以很清楚的看到,在build 操作中,我们看到,在build 中,除了 之前定义的consumer,还实现了一个 Listener 接口 (稍后会具体讲到),用来做一些 在批量求情之前和请求之后的处理。

至此为止,BulkProcessor 创建,就OK啦~。

内部逻辑实现

先不说话,我们先上张类图

image

可以看到,在 BulkProcessor 中,有这样的一些类和接口

Listener
Builder
BulkProcessor
Flush
===== 华丽的分界线
BulkProcessor 实现了 Closeable --> 继承自  AutoCloseable (关于AutoCloseable 本文不做过多说明,具体的可以百度,或者等待后续)

那么先从构建开始,我们来看下Builder


/**
* 简单的构建,可以看到,就是一个client 和 listener 这个不会做刷新策略,
*/
public static Builder builder(Client client, Listener listener) {
        Objects.requireNonNull(client, "client");
        Objects.requireNonNull(listener, "listener");
        return new Builder(client::bulk, listener, client.threadPool(), () -> {});
    }



/**
* 所有功能的builder 实现方法
* ScheduledThreadPoolExecutor 用来实现 按照时间频率,来进行 刷新,如 每5s 
*
*/
    public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); // 接口静态方式,来实现 Executor 的初始化
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), //
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }


/**
* 构造函数 
* @param consumer 前文定义的consumer request response action
* @param listener listener  BulkProcessor 内置监听器
* @param scheduler elastic 定时调度 类scheduler  
* @paran onClose 关闭时候的运行
*/

    private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
                        Scheduler scheduler, Runnable onClose) {
            this.consumer = consumer;
            this.listener = listener;
            this.scheduler = scheduler;
            this.onClose = onClose;
        }

通过上述的代码片段,可以很明显的看到,关于初始化构建的一些关键点和要素

看完builder 接下来,我们看下 bulkprocessor 是如何工作的

先看下构造方法

   
   BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // BulkRequestHandler 批量执行 handler 操作
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); //开始刷新任务
        this.onClose = onClose;
    }

startFlushTask 如何进行工作

 private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
   
   // 如果 按照时间刷新 为空,则直接返回 任务为取消状态
   if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }


// 刷新线程
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute(); // 下面方法
            }
        }
    }



/**
*  刷新执行
*
*/

 // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();
                // 刷新 bulkRequest 为下一批做准备
        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

看到这里,关于时间的定时调度,我们其实是很清楚了,那么 关于数据量 和 大小的判断策略在哪儿?

   
/**
* 各种添加操作
*/
public BulkProcessor add(DocWriteRequest request, @Nullable Object payload) {
        internalAdd(request, payload);
        return this;
    }

    /**
    * 我们可以看到,在添加之后,会做一个操作
    * executeIfNeeded 如果需要,则进行执行
    */
    private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
        ensureOpen();
        bulkRequest.add(request, payload);
        executeIfNeeded();
    }


/**
* 如果超过限制,则执行刷新操作
*/
 private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }


    /**
    * 这这儿,我们终于看到了 关于action 和 size 的判断操作,
    *
    */
    private boolean isOverTheLimit() {
        if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
            return true;
        }
        if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
            return true;
        }
        return false;
    }

通过上述的分析,关于按照时间,数据size,大小来进行flush 执行的入口我们都已经很清楚了

针对数据大小的设置。在每次添加的时候,做判断是否 超过限制
针对 时间的频次控制,交由ScheduledThreadPoolExecutor 来去做监控

下来,让我们看下具体的执行以及重试策略,和 返回值的处理

  public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
          // listener 填充 request 和执行ID
            listener.beforeBulk(executionId, bulkRequest);
            //通过信号量来进行资源的控制 来自于我们设置的 setConcurrentRequests
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
          // 进行执行并按照补偿重试策略如果失败
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                            //结果写入  ActionListener --> BulkProcessor.Listener 的转换
              @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            }, Settings.EMPTY);
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

最终的执行,在 RetryHandler 中,继续往下看

    public void execute(BulkRequest bulkRequest) {
            this.currentBulkRequest = bulkRequest;
            consumer.accept(bulkRequest, this);
        }

对,没错,只有一个操作, consumer.accept(bulkRequest, this);

再一次展现了 java 8 函数式接口的功能强大之处 此处 consumer.accept(bulkRequest, this);

执行的操作即 Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

如何设置重试策略,以及数据的筛选

 @Override
        public void onResponse(BulkResponse bulkItemResponses) {
            if (!bulkItemResponses.hasFailures()) {
                // we're done here, include all responses
                addResponses(bulkItemResponses, (r -> true));
                finishHim();
            } else {
                if (canRetry(bulkItemResponses)) {
                    addResponses(bulkItemResponses, (r -> !r.isFailed()));
                    retry(createBulkRequestForRetry(bulkItemResponses));
                } else {
                    addResponses(bulkItemResponses, (r -> true));
                    finishHim();
                }
            }
        }


/**
* 只针对失败的请求,放入到重试策略中
*/
   private void addResponses(BulkResponse response, Predicate<BulkItemResponse> filter) {
            for (BulkItemResponse bulkItemResponse : response) {
                if (filter.test(bulkItemResponse)) {
                    // Use client-side lock here to avoid visibility issues. This method may be called multiple times
                    // (based on how many retries we have to issue) and relying that the response handling code will be
                    // scheduled on the same thread is fragile.
                    synchronized (responses) {
                        responses.add(bulkItemResponse);
                    }
                }
            }
        }

再一次展示了函数式接口的强大之处

补充一张流程图

BulkProcessor.png

对这次的代码一些了解,对其中的一些设计理念和模式,学习到了不少的知识

如consumer, Listener,模式的应用。

如consumer ,Predicate 等函数式接口的用法。

如对线程的统一封装,来去做更统一的抽象处理

在后续的代码设计中,也会尝试采取这种设计模式,来写出更优雅的代码。es java api的代码内的不少设计模式,自我感觉还是很nice,后续会继续学习

仅代表个人见解,如有不对之出,欢迎指出

个人原创,转载请备明出处

上一篇下一篇

猜你喜欢

热点阅读