es 单主机,复制数据,迁移数据

2019-11-04  本文已影响0人  领带衬有黄金

需求:
公司需要改变Es所有索引的名字
使用re_index可以达到需求,但是会在转换的时候改变nested的类型为text,因此淘汰该方案。
github上有许多迁移数据的,但多是基于集群,目前还未搭建集群,所以,手写了以下脚本,供以后参考,欢迎指错。

elasticsearch   版本       5.5.3
import time

import math
import requests
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch

from utils.db_field_util import get_es_dbs_name

# 数据源
FROM_ES = '192.168.0.140'
# 需要传入的ES主机
TO_ES = '192.168.0.157'
from_es = Elasticsearch(hosts=FROM_ES)
to_es = Elasticsearch(hosts=TO_ES)


def get_url(index):
    return f"http://{FROM_ES}:9200/{index}/_mapping"


def get_body(index):
    resp = requests.get(get_url(index))
    return resp.json()[index]


# 建立索引
def create_index(index_name, body):
    try:
        to_es.indices.create(index_name, body)
        return True
    except:
        print('索引已经存在')
        return False


# 读取es数据
def async_es_data(index):
    # 创建新的索引
    create_flag = create_index(index_name=index.split('_').pop(), body=get_body(index))
    if create_flag:
        new_index = index.split('_').pop()
        print('索引创建成功,开始插入数据!!!!!!!!!!!!')
    else:
        return
        # 获取到总数
    resp = from_es.search(
        index=index,
        body={
            "query": {
                "match_all": {}
            }
        }
    )
    total = resp['hits']['total']
    actions = []
    start_time = time.time()  # 初始存入时间
    for num, i in enumerate(get_page_data(total, index, 100), 1):
        # print(i['hits']['hits'], total, num)
        for h in i['hits']['hits']:
            h['_index'] = new_index
        actions.extend(i['hits']['hits'])
        # 添加十次,存一次ES
        if not num % 10:
            result = bulk(to_es, actions, new_index, raise_on_error=True)
            print(actions[0])
            print(f'{num - 10}00~~{num}00条数据插入成功,已耗时{time.time() - start_time}s', result)
            actions = []
    if actions:
        print(actions[0])
        result = bulk(to_es, actions, new_index, raise_on_error=True)
        print(f'最后{len(actions)}条数据插入成功,最终耗时{time.time() - start_time}s', result)


def get_page_data(total, index, size):
    # 使用生成器,返回每页的数据
    total_page = math.ceil(total / 100)
    page = 0
    while page < total_page:
        yield from_es.search(
            index=index,
            body={
                "query": {
                    "match_all": {}
                },
                "from": page * size,
                "size": size
            }
        )
        page += 1


# 打开注释,开始同步
# print(get_es_dbs_name())
# for i, j in enumerate(get_es_dbs_name(), 1):
#     if '_' in j:
#         print(i, j)
# print(len(get_es_dbs_name()))

for i in get_es_dbs_name():
    if not i.endswith('iw'):
        async_es_data(i)


# async_es_data('eai_cmsm')


# print('aa')
# 数据校验
def check_data(index):
    resp_total = from_es.search(
        index=index,
        body={
            "query": {
                "match_all": {}
            }
        }
    )
    total = resp_total['hits']['total']
    # 1000 条数据校验
    print('总数据', total)
    # 记录不同的id值
    ids = []
    new_index = index.split('_').pop()
    for i_num, i in enumerate(get_page_data(total, index, 1000)):
        if i_num > 8:
            for h in i['hits']['hits']:
                # 获取到老库的单个数据,与新数据库单条数据比较
                # requests.get('http://192.168.0.140:9200/ahad/table/5d6c93a2a6632fa41c22e9d3')
                old_data = requests.get(f'http://192.168.0.140:9200/{index}/table/{h["_id"]}')
                old_data = old_data.json()
                old_data['_index'] = new_index
                new_data = requests.get(f'http://192.168.0.140:9200/{new_index}/table/{h["_id"]}')
                if not old_data['_source'] == new_data.json()['_source']:
                    ids.append(h['_id'])
        print(f'{i_num}000校验完毕,数据不同的id:', ids, '---------------------', len(ids))
    print('最终不同的id', ids)

# check_data('msd_ahad')

上一篇 下一篇

猜你喜欢

热点阅读