数据库ELK

[9]es数据批量更新及数据导入导出

2022-03-25  本文已影响0人  不怕天黑_0819

本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。


主要借助了es的scroll完成对要更新数据的备份,然后利用bulk完成对数据的批量更新。

自己写的代码(参考官网,bulk的写法和官网的不太一样):


/**

* 批量更新elasticsearch数据,目前仅支持更新int字段或将某一field字段的值赋给另外的字段。

*

* @param index

* @param type

* @param size bulk批量写入一次写入的数据

* @param routing 没有设置routing,为null或""就行

* @param query

* @param updateField 需要更新的字段

* @param updateValue 更新值,可以为int或mapping中的一个字段。

* @return

*/

private String searchElasticsearchDataByScroll(String index, String type, int size, String routing, QueryBuilder query, String updateField, String updateValue) {

Map<String, Object> hitMap;

SearchResponse response = client.prepareSearch(index)

.setTypes(type)

.setQuery(query)

.setScroll(new TimeValue(6000))

.setSize(size).execute().actionGet();

if (size < 500) {

log.warn("bulk size is small,you'd better set size near 1000 ,size:" + size);

}

BulkRequestBuilder bulkRequest = client.prepareBulk();

do {

for (SearchHit searchHit : response.getHits().getHits()) {

hitMap = searchHit.getSource();

if (!hitMap.containsKey(updateField)) {

log.error("updateField is not exist,updateField:" + updateField);

break;

}

if (hitMap.containsKey(updateValue)) {

hitMap.put(updateField, hitMap.get(updateValue));

} else {

try {

hitMap.put(updateField, Integer.parseInt(updateValue));//默认如果在hit中找不到要更新的值,则将updateValue当做int来进行更新

} catch (NumberFormatException e) {

log.error("can not solve updateValue。", e);

break;

}

}

if (routing.equalsIgnoreCase("") || routing == null) {

bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setDoc(hitMap));

} else {

if(hitMap.get(routing)==null){

log.error("error:can not find routing,routing:"+routing);

}

bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));

}

bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));

}

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if (bulkResponse.hasFailures()) {

log.warn("bulk updateElasticsearch has fail, reason is" + bulkResponse.buildFailureMessage());

}

response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();

} while (response.getHits().getHits().length != 0);

boolean clearScroll = clearScroll(response.getScrollId());

if (!clearScroll){

log.warn("clear scroll id false,scrollId:"+response.getScrollId());

}

return "";

}

关于数据批量导入导出的参考链接:
http://blog.csdn.net/u010585120/article/details/48028255

关于es数据的批量导出和导入,可以作为参考,代码不一定是完全正确的。
https://my.oschina.net/chiyong/blog/552622

可以作为代码的参考,但是存在一定的问题,按照官网的示例可以实现。 关于es的批量导入导出。


关于scroll的使用的一些注意事项:

官网文档:

https://www.elastic.co/guide/en/elasticsearch/reference/5.3/search-request-scroll.html#scroll-search-context

按照docid进行排序的scroll性能会更好。

Scroll requests have optimizations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option:

Sliced Scroll:

For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently。

使用分片scroll的数量不能大于集群的分片数。官网上也介绍了根据uid hash的算法,当然也可以自己选择进行hash的字段,但是有一定的要求。使用sliced scroll可以加快scroll的处理速度。

上一篇下一篇

猜你喜欢

热点阅读