mysql历史数据增加雪花算法记录

2022-01-09  本文已影响0人  frankie_cheung

最近有个需求:

对分库分表的历史数据增加一个ID 这个ID不能是自增的主键ID 因为是分库分表 ,自增ID容易重复,所以百度了一下,看到网上有使用雪花算法的,所以用雪花算法来加。

具体操作流程:

使用pandas增加,因为原来的表没有主键,你增加雪花算法只能通过一条条的去update,效率非常慢,但是我用pandas读取批量加也很慢,因为我们数据量很大,大概每个节点单表3亿数据,所以需要快速的加。

pandas加的伪代码如下,python版本

import pandas as pd

SQL='select * from table limit 0,10000'
#每次读取10000条

df=pd.read_sql(sql=SQL,con=conn)

#使用脚本生成雪花算法ID 后面贴代码

new_df=df[id]=[雪环算法ID list]

太慢了,一晚上跑了1800w,一共3亿。
所以使用outfile+load into 方式来处理。

新的操作流程

雪花ID生成文件代码,我也是网上搜的别人的

from datetime import datetime
import time
import sys



table_name=sys.argv[1]
range_count=sys.argv[2]


WORKER_ID_BITS = 5
DATACENTER_ID_BITS = 5
SEQUENCE_BITS = 12

MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS)  # 2**5-1 0b11111
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)

WOKER_ID_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS

SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)

TWEPOCH = 1288834974657




class IdWorker(object):
    """
    """

    def __init__(self, datacenter_id, worker_id, sequence=0):
        """
        """
        # sanity check
        if worker_id > MAX_WORKER_ID or worker_id < 0:
            raise ValueError('worker_id')

        if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
            raise ValueError('datacenter_id')

        self.worker_id = worker_id
        self.datacenter_id = datacenter_id
        self.sequence = sequence

        self.last_timestamp = -1 

    def _gen_timestamp(self):
        """
        :return:int timestamp
        """
        return int(time.time() * 1000)

    def get_id(self):
        """
        :return:
        """
        timestamp = self._gen_timestamp()

        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & SEQUENCE_MASK
            if self.sequence == 0:
                timestamp = self._til_next_millis(self.last_timestamp)
        else:
            self.sequence = 0

        self.last_timestamp = timestamp

        new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \
                 (self.worker_id << WOKER_ID_SHIFT) | self.sequence
        return new_id

    def _til_next_millis(self, last_timestamp):
        """
        """
        timestamp = self._gen_timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._gen_timestamp()
        return timestamp


if __name__ == '__main__':
    worker = IdWorker(1, 2, 0)
    with open("/data/mysqldata/3306/tmp/{}_id.txt".format(table_name),"w") as f:
        print('start generate time:',datetime.now())
        for i in range(int(range_count)):
            id=worker.get_id()
            f.write('"'+str(id)+'"'+'\n')

        print('end time',datetime.now())


注意上述python是通过入参的方式执行

python table_name 100000

table_name是需要创建雪花ID的表,100000是雪花ID多少行

合并是Linux的paste命令

paste -d "," id_file db_table_file > new file

id_file 代表雪花ID文件
db_table_file代表outfile的文件

最后再load 到数据库内即可。

效率:

2.9 亿数据导出 31g大小时间忘记了,应该最多15分钟
创建2.9亿雪花ID列表 需要37分钟
paste合并列 大概3分钟
load 到数据库时间最近 1h5m

上一篇 下一篇

猜你喜欢

热点阅读