索引迁移

2017-11-01  本文已影响0人  YG_9013

索引迁移工具esm

下载地址:https://github.com/medcl/esm
经过测试发现使用--copy_setting和--copymappings失败。而只用--copymappings也不起作用。

/bin/esm -s=http://192.168.3.206:9200 -d=http://localhost:9200 --copy_settings --copy_mappings -x=bestbuykaggle  

手动创建索引,设置mapping和setting。数据导入导出没问题。但是速度很慢,可能是我单个文档有点大,大约在3000/s。

reindex测试

ES5之后,推出了一个reindex的功能,可在不同集群间传递数据。详细信息可看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docs-reindex.html

curl -XPOST 'localhost:9200/_reindex?pretty' -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://xxx:9200",
    },
    "index": "source",
  },
  "dest": {
    "index": "dest"
  }
}
'

这个也是用手动创建索引,设置mapping和setting。经过测试,数据导入导出没问题。速度在4200/s。系统默认单进程处理,针对于几千万甚至上亿的数据量,这个速度还是慢。后来打算用reindex+slice这种方式通过多进程操作。但是Reindexing from remote clusters does not support manual or automatic slicing.即从其他集群导数据,不支持人工和手动切片。。这条路行不通。

手动scroll+slice+bulk

利用scroll和slice,并行从原集群中读数据,然后并行地通过Bulk写入目标集群。

package com.dump.core;

import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

public class DumpIndex {

    public static String index="";
    public static String dest_cluster_name="";
    public static String source_cluster_name="";
    public static String dest_ips[]=null;
    public static String source_ips[]=null;
    public static String type="";
    public static TransportClient source_client = null;
    public static TransportClient dest_client = null;
    public static ExecutorService executor;
    public static int threadSize;

    public static void setConfES() throws Exception {
        Settings settings = Settings.builder().put("cluster.name", source_cluster_name).build();
        int port = 9300;
        source_client = new PreBuiltTransportClient(settings);

        for (int i=0;i<source_ips.length;i++){
            source_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(source_ips[i]), port));
        }

        settings = Settings.builder().put("cluster.name", dest_cluster_name).build();
        dest_client = new PreBuiltTransportClient(settings);
        for (int i=0;i<dest_ips.length;i++){
            dest_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(dest_ips[i]), port));
        }
    }

    public static BulkProcessor getBulker(final int id){
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                dest_client,
                new BulkProcessor.Listener() {
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        if (!response.hasFailures()) {
                            System.out.println("线程 "+id+" 索引过程了"+response.getItems().length+"个文档");
                        } else {
                            System.out.println("线程 "+id+" 索引过程中遇到了一些失败");
                        }
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        failure.printStackTrace();
                    }
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(0)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
                .build();
        return bulkProcessor;
    }

    public static void parallel(){
        for(int i=0;i<threadSize;i++){
            QueryBuilder qb = matchAllQuery();
            String vals="";
            SliceBuilder sliceBuilder = new SliceBuilder(i, threadSize);
            SearchResponse scrollResp = source_client.prepareSearch(index)
                    .setScroll(new TimeValue(60000))
                    .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
                    .setQuery(qb)
                    .slice(sliceBuilder)
                    .setSize(5000).get();
            BulkProcessor bulkProcessor = getBulker(i);
            SliceCustomer customer =new SliceCustomer(scrollResp,bulkProcessor,source_client,index,type);
            executor.execute(customer);
        }
        executor.shutdown();
    }
    public static void main(String []args) throws Exception {
        try {
            source_cluster_name= args[0];
            System.out.println("source_cluster_name is "+ source_cluster_name);
            source_ips = args[1].split(",");
            System.out.println("source_ips is "+args[1]);
            dest_cluster_name = args[2];
            System.out.println("dest_cluster_name is " + dest_cluster_name);
            dest_ips = args[3].split(",");
            System.out.println("dest_ips is " + args[3]);
            index = args[4];
            System.out.println("index is " + index);
            type = index.split("-")[0];
            System.out.println("type is " + type);
            threadSize = Integer.parseInt(args[5]);
            System.out.println("threadSize is " + threadSize);
            setConfES();
        } catch (Exception e) {
            System.out.println("参数个数错误,参数依次是source_cluster_name,source_ips,dest_cluster_name,dest_ips,threadSize");
            return;
        }
        executor = Executors.newFixedThreadPool(threadSize);
        setConfES();
        parallel();
    }
}

class SliceCustomer implements Runnable{
    public SearchResponse scrollResp;
    public BulkProcessor bulkProcessor;
    public TransportClient source_client;
    public String index;
    public String type;

    public SliceCustomer(SearchResponse scrollResp, BulkProcessor bulkProcessor, TransportClient source_client, String index, String type) {
        this.scrollResp = scrollResp;
        this.bulkProcessor = bulkProcessor;
        this.source_client = source_client;
        this.index = index;
        this.type = type;
    }

    @Override
    public void run() {
        String vals ="";
        do {
            for (SearchHit hit : scrollResp.getHits().getHits()) {
                vals=hit.sourceAsString();
                bulkProcessor.add(new IndexRequest(index,type).source(vals));
            }
            scrollResp = source_client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
        } while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

        bulkProcessor.flush();
        try {
            bulkProcessor.awaitClose(100, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

对_doc进行排序(addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)),ES底层优化,更快速地获取到数据,然后并行地通过bulk写入目标ES。
但是,但是又有问题出现了。scroll+slice会导致节点内存占用过高:

If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N is the total number of documents in the shard. After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of sliced query you perform in parallel to avoid the memory explosion.

可以通过在slice中指定一个doc_values来解决这个问题。更详细的信息请看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-request-scroll.html#sliced-scroll
经测试,设置4个线程的情况下,除去ES启动时间,索引速度在8000/s左右。对多个数亿的文档来说,依然满足不了要求。

终极方案

通过hive重新导数据。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

复制原索引的setting和mapping脚本

import os
import json
import re
import sys
import threading

def get_json(cmd):
    val = os.popen(cmd).read()
    result = json.loads(val)
    return result

def set_mappings(index,source,dest):
    cmd = "curl %s/%s/_mapping" % (source,index)
    print cmd
    result = get_json(cmd)
    mappings = json.dumps(result[index]["mappings"])
    cmd = "curl %s/%s/_settings" % (source,index)
    print cmd
    result = get_json(cmd)
    print result
    settings = result[index]["settings"]["index"]
    pri_num = settings["number_of_shards"]
    rep_num = settings["number_of_replicas"]
    codec = ''
    routing =''
    try:
        codec = settings["codec"]
    except:
        codec = ''
    try:
        routing = json.dumps(settings["routing"])
    except:
        routing =''
    settings = "{\"number_of_shards\":%s,\"number_of_replicas\":%s," % (pri_num,rep_num)
    if len(codec)>0:
        tmp = "\"codec\":\"%s\"," % codec
        settings = settings + tmp 
    if len(routing)>0:
        tmp = "\"routing\":%s," % routing
        settings = settings + tmp
    settings = settings[:-1] +"}"
    
    print mappings
    print "--------------------"
    print settings
    print "--------------------"
    cmd = "curl -XPUT %s/%s -d '{\"settings\":%s,\"mappings\":%s}'" % (dest,index,settings,mappings)
    result = get_json(cmd)
    print cmd
    print "--------------------"
    print result
    if "error" in result.keys():
        return 0
    if "acknowledged" in result.keys():
        if result["acknowledged"] == 'false' or result["shards_acknowledged"] == 'false':
            return 0
    return 1

def run():
    args = sys.argv
    source = args[1]
    dest = args[2]
    index = args[3]
    print "stat to move %s,create mapping..." % index
    set_mappings(index,source,dest)

run()


上一篇下一篇

猜你喜欢

热点阅读