玩转大数据大数据大数据&云计算

基于SCF实现Elasticsearch索引的批量shrink

2020-07-31  本文已影响0人  bellengao

在冷热分离的Elasticsearch集群架构中,往往会使用拥有较高规格cpu和内存、以及SSD盘的机器作为热节点,用于保证高并发的写入。通常索引都是按天创建的,只有当天的索引会进行写入,存量的索引会定期比如说在索引创建15天后迁移到冷节点或者warm节点上,冷节点或者warm节点的cpu和内存配置都可以低一些,并且使用SATA盘存储降低成本。

在热节点上的索引,为了保证写入性能,通常分片数会设置的和热节点的数量一致,使得每台机器的资源都可以利用上。但是随着索引数量的不断增加,集群整体的分片数量也越来越多,如果分片数量达到了数万,对集群的稳定性和性能都会有不小的影响。所以需要对集群整体的分片数量进行控制,避免分片数过多而导致集群不稳定,好在ES本身有shrink 索引的功能,可以降低索引的分片数,但是shrink操作有一些前置条件和使用限制,不是直接对索引的分片数调低,而是新建一个分片数量少的索引,硬链接到老的索引,然后对新索引执行recovery直至新索引变green。我们可以在新索引变green后,删除老的索引,然后对新索引建立别名,别名和老索引的名称完全一样,可以按照老索引的名称查询数据。

本文尝试使用SCF云函数对存量的大量的老索引,通过shrink,降低索引的分片数量。

实施步骤

1. 创建SCF云函数

如图,基于名为"ES写入函数"的模板,创建一个新的函数:


image

点击"下一步"进入函数编辑界面,直接复制如下函数代码粘贴到编辑框,修改ES的vip和用户名密码,以及索引前缀名称等信息:

# -*- coding: utf8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
import time
import math

ESServer = Elasticsearch(["xxx:9200"],http_auth=('elastic', 'xxx'))

# 临时索引,用于记录当前正在执行哪一个老的索引
tempIndex = "temp-snapshot"
# 老的索引前缀
indexPattern = "test-*"
currentIndex = None

# 根据磁盘使用量,获取用量最小的warm节点id
def get_warm_node():
    rsp = ESServer.nodes.stats(metric="indices",index_metric="store")
    nodesInfo = rsp["nodes"]
    minStorageSizeNodeId = None
    minStorageSize = 0
    for node in nodesInfo:
        nodeInfo = nodesInfo[node]
        if nodeInfo["attributes"]["temperature"] != "warm":
            continue
        nodeStorageSize = nodeInfo["indices"]["store"]["size_in_bytes"]
        if minStorageSize == 0:
            minStorageSize = nodeStorageSize
            minStorageSizeNodeId = node
        if nodeStorageSize < minStorageSize:
            minStorageSize = nodeStorageSize
            minStorageSizeNodeId = node
    return minStorageSizeNodeId

# 检查老索引的状态,判断是否有正在迁移中的分片
def check_old_index_status(index):
    params = {}
    params["format"] = "json"
    rsp = ESServer.cat.recovery(index = index, params = params)
    for shardStats in rsp:
        if shardStats["stage"] != "done":
            return False
    return True

# 检查新索引的状态,判断是否green
def check_new_index_status(index):
    rsp = ESServer.cluster.health(index = index)
    if rsp != None and rsp["status"] == "green":
        return True
    return False

# 根据索引数据量确定要shrink到几个分片
def calTargeIndexShardsNum(index):
    params = {}
    params["format"] = "json"
    rsp = ESServer.cat.indices(index = index, params = params)
    indexInfo = rsp[0]
    storageSize = indexInfo["pri.store.size"]
    shardNum = indexInfo["pri"]
    if storageSize.endswith("gb"):
        size = float(storageSize[0:rfind("gb")])
        targetShardsNum =  int(math.ceil(size/50))
        while shardNum / targetShardsNum * targetShardsNum < shardNum:
            targetShardsNum = targetShardsNum + 1
        return targetShardsNum
    else:
        return 1

# 执行shrink
def shrink_index(index):
    body = {}
    body["settings"]={}
    body["settings"]["index.number_of_replicas"]=0
    targetShardsNum = calTargeIndexShardsNum(index)
    print "shrink index: " + index + ", target shards num:" + str(targetShardsNum)
    body["settings"]["index.number_of_shards"] = targetShardsNum
    body["settings"]["index.routing.allocation.require._id"] = None
    rsp = ESServer.indices.shrink(index=index, target="shrink-"+index, body=body)
    if rsp is not None and rsp["acknowledged"] == True:
        return True
    else:
        return False

# 添加别名
def add_alias(index):
    shrinkIndex = "shrink-"+index
    rsp = ESServer.indices.put_alias(index=shrinkIndex, name=index)
    if rsp is not None and rsp["acknowledged"] == True:
        return True
    else:
        return False

# 删除索引
def delete_index(index):
    rsp = ESServer.indices.delete(index=index, ignore=[400, 404])
    if rsp is not None and rsp["acknowledged"] == True:
        return True
    else:
        return False

# 选择需要执行shrink的老索引
def selectNeedToShrinkIndex():
    params = {}
    params["format"] = "json"
    rsp = ESServer.cat.indices(index = indexPattern, params = params)
    for indexInfo in rsp:
        indexName = indexInfo["index"]
        if indexName.startswith("shrink-"):
            continue
        if indexInfo["health"] == 'green' and indexInfo["status"] == 'open' and indexInfo["pri"] == "60":
            indexSettings = ESServer.indices.get_settings(index=indexName)
            allocationSettings = indexSettings[indexName]["settings"]["index"]["routing"]["allocation"]["require"]
            if allocationSettings["temperature"] == 'warm' and "_id" not in allocationSettings:
                return indexName
    return None

# 把老索引的分片都迁移至一个节点上
def reallocatingOldIndex(index):
    warmNodeId = get_warm_node()
    if warmNodeId == None:
        print "warmNodeId is null"
        return
    print "warmNodeId: " + warmNodeId
    params = {}
    params["index.blocks.write"] = "true"
    params["index.routing.allocation.require._id"] = warmNodeId
    ESServer.indices.put_settings(index= index, body=params)

# 记录当前正在执行的老索引,便于后续轮询
def recordCurrentIndex(currentIndex):
    indexBody ={}
    indexBody["currentIndex"] = currentIndex
    headers= {}
    headers["Content-Type"] = "application/json"
    indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="2")
    print "index current index success!"

# 记录当前正在执行中的新索引,便于后续轮询
def recordShrinkIndex(shrinkIndex):
    indexBody ={}
    indexBody["shrinkIndex"] = shrinkIndex
    headers= {}
    headers["Content-Type"] = "application/json"
    indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="3")
    print "index current index success!"

# 检查shrink操作
def checkShrink(index):
    shrinkIndex = "shrink-"+index
    isShrinkIndexReady = check_new_index_status(shrinkIndex)
    if isShrinkIndexReady == True:
        deleteSuccess = delete_index(index)
        if deleteSuccess == True:
            success = add_alias(index)
            if success == True:
                deleteDocument(tempIndex, "2")
                deleteDocument(tempIndex, "3")
                body = {}
                body["indexName"] = index
                addDocument(tempIndex, body)
                forceMerge(index)
                print "shrink index "+ index + "finished!"
            else:
                print "add alias failed"
        else:
            print "delete old index: "+ index + "failed"

# 删除临时索引中的记录
def deleteDocument(index, docId):
    rsp = ESServer.delete(index=index, id=docId, doc_type="_doc")
    if rsp is not None:
        print "delete document: " + index + ", id: "+ docId + "success"
        return True
    return False

# 在临时索引中记录所有的已完成shrink的索引名称
def addDocument(index, body):
    rsp = ESServer.index(index=index, doc_type="_doc", body = body)
    if rsp is not None:
        print "record document: " + index + " success"
        return True
    return False

# 对新索引执行merge
def forceMerge(index):
    params = {}
    params["max_num_segments"] = 1
    rsp = ESServer.indices.forcemerge(index=index, params =params)
    if rsp is not None:
        print "forcemerge index: " + index + " success"
        return True
    return False

# 执行shrink
def execShrink(index):
    isOldIndexReady = check_old_index_status(index)
    if isOldIndexReady == True:
        print "old index: " + index + " is ready"
        success = shrink_index(index)
        if success == True:
            recordShrinkIndex("shrink-"+index)
        else:
            print "shrink failed"
    else:
        print "old index: " + index + " is reallocating"

# 选择老索引
def selectIndexToExecShrink():
    currentIndex = selectNeedToShrinkIndex()
    if currentIndex == None:
        print "No new index needs to be shrinken"
    else:
        print "current index: " + currentIndex
        recordCurrentIndex(currentIndex)
        reallocatingOldIndex(currentIndex)
    return currentIndex

def check():
    existed = ESServer.indices.exists(tempIndex)
    if existed == True:
        getTempDoc = ESServer.get(tempIndex, doc_type="_doc", id="2", ignore=[404])
        if getTempDoc["found"] == False:
            currentIndex = selectIndexToExecShrink()
        else:
            currentIndex = getTempDoc["_source"]["currentIndex"]
            print "current index: " + currentIndex
        if currentIndex == None:
            return

        tempDoc = ESServer.get(tempIndex, doc_type="_doc", id="3", ignore=[404])
        if tempDoc["found"] == True:
            checkShrink(currentIndex)
        else:
            execShrink(currentIndex)
    else:
        selectIndexToExecShrink()


def main_handler(event,context):
    check()

该函数主要的逻辑有:

  1. 通过索引名称通配符找到老的索引,选择一个索引
  2. 选择一个固定的warm节点,把1中选出的索引的分片全部移动到这个warm节点上去(索引完整的一份数据都在一个节点上,才能执行shrink,因为要进行硬链接)
  3. 根据老索引的数据量确定目标分片数量,按一个分片50GB确定,向上取整,并且使得目标分片数量为老索引分片数量的因子
  4. 分片移动完毕后,执行shrink操作,新索引的名称为shrink-{老索引的名称},新索引的分片数量只能为老索引分片数量的因子,比如老索引的分片数为10, 则新索引的分片数只能为1、2或者5(为了保证数据不用rehash)
  5. 检查新索引的状态,等待状态变为green并且没有在初始化中的分片
  6. 删除老索引
  7. 对新索引添加别名,别名为老索引的名称
  8. 继续执行步骤1-7, 直至所有的老索引都执行完毕,此举是为了避免大量的分片移动、初始化的操作对索引的新建产生影响,在规模较大的集群中容易出现该问题
image

点击"完成"即可完成云函数的创建。

2. 配置云函数

创建完云函数后,需要进行配置才能使用,如下图,可以配置函数的私有网络VPC和Subnet(选择和ES相同的VPC和Subnet):


image

3. 测试云函数

配置完云函数后,可以对函数代码进行测试,保证能够正常运行;如果需要进行编辑,可以直接编辑然后点击"保存并测试":


image

4. 配置触发器

配置触发器,可以自定义选择执行周期触发函数:


image
上一篇下一篇

猜你喜欢

热点阅读