ES 操作之批量写-BulkProcessor 原理浅析
最近对线上业务进行重构,涉及到ES同步这一块,在重构过程中,为了ES 写入 性能考虑,大量的采取了 bulk的方式,来保证整体的一个同步速率,针对BulkProcessor 来深入一下,了解下 是如何实现,基于请求数,请求数据量大小 和 固定时间,刷新写入ES 的原理
针对ES 批量写入, 提供了3种方式,在 high-rest-client 中
分别是 bulk bulkAsync bulkProcessor 3种方式。
本文主要针对 bulkProcessor 来进行一些讲述
BulkProcessor
文档介绍
BulkProcessor是一个线程安全的批量处理类,允许方便地设置 刷新 一个新的批量请求
(基于数量的动作,根据大小,或时间),
容易控制并发批量的数量
请求允许并行执行。
创建流程
How To use ?
来看个demo 创建BulkProcessor
@Bean(name = "bulkProcessor") // 可以封装为一个bean,非常方便其余地方来进行 写入 操作
public BulkProcessor bulkProcessor(){
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// todo do something
int i = request.numberOfActions();
log.error("ES 同步数量{}",i);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// todo do something
Iterator<BulkItemResponse> iterator = response.iterator();
while (iterator.hasNext()){
System.out.println(JSON.toJSONString(iterator.next()));
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// todo do something
log.error("写入ES 重新消费");
}
}).setBulkActions(1000) // 达到刷新的条数
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) // 达到 刷新的大小
.setFlushInterval(TimeValue.timeValueSeconds(5)) // 固定刷新的时间频率
.setConcurrentRequests(1) //并发线程数
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) // 重试补偿策略
.build();
}
使用BulkProcessor
bulkProcessor.add(xxxRequest)
创建过程做了些什么?
-
创建一个consumer 对象用来封装传递参数,和请求操作
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
我们可以看到用了java 8的函数式编程接口 BiConsumer 关于 BiConsumer 的用法,可以自行百度,因为也是采取的 异步刷新策略, 所以,是一个返回结果的Listener ActionListener<BulkResponse>
-
构建并BulkProcess
return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() { **** }).setBulkActions(1000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); }
可以很清楚的看到,在build 操作中,我们看到,在build 中,除了 之前定义的consumer,还实现了一个 Listener 接口 (稍后会具体讲到),用来做一些 在批量求情之前和请求之后的处理。
至此为止,BulkProcessor 创建,就OK啦~。
内部逻辑实现
先不说话,我们先上张类图
image可以看到,在 BulkProcessor 中,有这样的一些类和接口
Listener
Builder
BulkProcessor
Flush
===== 华丽的分界线
BulkProcessor 实现了 Closeable --> 继承自 AutoCloseable (关于AutoCloseable 本文不做过多说明,具体的可以百度,或者等待后续)
那么先从构建开始,我们来看下Builder
/**
* 简单的构建,可以看到,就是一个client 和 listener 这个不会做刷新策略,
*/
public static Builder builder(Client client, Listener listener) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");
return new Builder(client::bulk, listener, client.threadPool(), () -> {});
}
/**
* 所有功能的builder 实现方法
* ScheduledThreadPoolExecutor 用来实现 按照时间频率,来进行 刷新,如 每5s
*
*/
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); // 接口静态方式,来实现 Executor 的初始化
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), //
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
}
/**
* 构造函数
* @param consumer 前文定义的consumer request response action
* @param listener listener BulkProcessor 内置监听器
* @param scheduler elastic 定时调度 类scheduler
* @paran onClose 关闭时候的运行
*/
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
Scheduler scheduler, Runnable onClose) {
this.consumer = consumer;
this.listener = listener;
this.scheduler = scheduler;
this.onClose = onClose;
}
通过上述的代码片段,可以很明显的看到,关于初始化构建的一些关键点和要素
看完builder 接下来,我们看下 bulkprocessor 是如何工作的
先看下构造方法
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Scheduler scheduler, Runnable onClose) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.scheduler = scheduler;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // BulkRequestHandler 批量执行 handler 操作
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); //开始刷新任务
this.onClose = onClose;
}
startFlushTask 如何进行工作
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
// 如果 按照时间刷新 为空,则直接返回 任务为取消状态
if (flushInterval == null) {
return new Scheduler.Cancellable() {
@Override
public void cancel() {}
@Override
public boolean isCancelled() {
return true;
}
};
}
final Runnable flushRunnable = scheduler.preserveContext(new Flush());
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
}
private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
return;
}
execute();
}
// 刷新线程
class Flush implements Runnable {
@Override
public void run() {
synchronized (BulkProcessor.this) {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute(); // 下面方法
}
}
}
/**
* 刷新执行
*
*/
// (currently) needs to be executed under a lock
private void execute() {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();
// 刷新 bulkRequest 为下一批做准备
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler.execute(bulkRequest, executionId);
}
看到这里,关于时间的定时调度,我们其实是很清楚了,那么 关于数据量 和 大小的判断策略在哪儿?
/**
* 各种添加操作
*/
public BulkProcessor add(DocWriteRequest request, @Nullable Object payload) {
internalAdd(request, payload);
return this;
}
/**
* 我们可以看到,在添加之后,会做一个操作
* executeIfNeeded 如果需要,则进行执行
*/
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
ensureOpen();
bulkRequest.add(request, payload);
executeIfNeeded();
}
/**
* 如果超过限制,则执行刷新操作
*/
private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
return;
}
execute();
}
/**
* 这这儿,我们终于看到了 关于action 和 size 的判断操作,
*
*/
private boolean isOverTheLimit() {
if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
return true;
}
if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
return true;
}
return false;
}
通过上述的分析,关于按照时间,数据size,大小来进行flush 执行的入口我们都已经很清楚了
针对数据大小的设置。在每次添加的时候,做判断是否 超过限制
针对 时间的频次控制,交由ScheduledThreadPoolExecutor 来去做监控
下来,让我们看下具体的执行以及重试策略,和 返回值的处理
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
// listener 填充 request 和执行ID
listener.beforeBulk(executionId, bulkRequest);
//通过信号量来进行资源的控制 来自于我们设置的 setConcurrentRequests
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
// 进行执行并按照补偿重试策略如果失败
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
//结果写入 ActionListener --> BulkProcessor.Listener 的转换
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
最终的执行,在 RetryHandler 中,继续往下看
public void execute(BulkRequest bulkRequest) {
this.currentBulkRequest = bulkRequest;
consumer.accept(bulkRequest, this);
}
对,没错,只有一个操作, consumer.accept(bulkRequest, this);
再一次展现了 java 8 函数式接口的功能强大之处 此处 consumer.accept(bulkRequest, this);
执行的操作即 Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
如何设置重试策略,以及数据的筛选
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (!bulkItemResponses.hasFailures()) {
// we're done here, include all responses
addResponses(bulkItemResponses, (r -> true));
finishHim();
} else {
if (canRetry(bulkItemResponses)) {
addResponses(bulkItemResponses, (r -> !r.isFailed()));
retry(createBulkRequestForRetry(bulkItemResponses));
} else {
addResponses(bulkItemResponses, (r -> true));
finishHim();
}
}
}
/**
* 只针对失败的请求,放入到重试策略中
*/
private void addResponses(BulkResponse response, Predicate<BulkItemResponse> filter) {
for (BulkItemResponse bulkItemResponse : response) {
if (filter.test(bulkItemResponse)) {
// Use client-side lock here to avoid visibility issues. This method may be called multiple times
// (based on how many retries we have to issue) and relying that the response handling code will be
// scheduled on the same thread is fragile.
synchronized (responses) {
responses.add(bulkItemResponse);
}
}
}
}
再一次展示了函数式接口的强大之处
补充一张流程图
BulkProcessor.png对这次的代码一些了解,对其中的一些设计理念和模式,学习到了不少的知识
如consumer, Listener,模式的应用。
如consumer ,Predicate 等函数式接口的用法。
如对线程的统一封装,来去做更统一的抽象处理
在后续的代码设计中,也会尝试采取这种设计模式,来写出更优雅的代码。es java api的代码内的不少设计模式,自我感觉还是很nice,后续会继续学习
仅代表个人见解,如有不对之出,欢迎指出
个人原创,转载请备明出处