ClickHouseClickHouse

ClickHouse复制表同步机制浅析

2021-07-17  本文已影响0人  旺旺鸽不鸽

ReplicatedMergeTree是ClickHouse最常用的表引擎之一,该引擎和MergeTree一样都继承自MergeTreeData, 和MergeTree共享相同的底层存储层实现。ReplicatedMergeTree区别于MergeTree引擎的地方在于其实现了多副本数据存储,并借助zookeeper进行多副本之间的数据同步。

本文针对ReplicatedMergeTree引擎如何实现副本间数据同步,结合部分源代码(20.10版本)做一个简单剖析。

0 术语解释

data part : clickhouse中的数据组织单位,一次INSERT会生成一个或多个data part, 一个data part里的数据存放在同一个文件夹里。

mutation : 对一个或多个data parts的内容数据的修改操作,最典型的触发mutation的操作是:ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE(详见mutations)。

1 哪些操作会引发数据同步?

这里说的数据同步包括了元数据和内容数据的同步。在ClickHouse中,数据插入(INSERT),元数据变更(ALTER),内容数据变更(MUTATION),合并(MERGE)等操作都会引发数据同步。

1.1 INSERT

clickhouse执行ReplicatedMergeTree表的插入操作时,会将数据以data part为单位写入到磁盘/内存里,每个data part写入完成后都会做一个commit操作, 这个commit操作会生成一个GET_PART类型的log entry并上传到zookeeper中供其他副本拉取同步,详见源码

由此可见,INSERT语句引发的是data part级别的内容数据的同步。

1.2 ALTER METADATA

这里之所以称为'ALTER METADATA'而不直接称'ALTER',是因为,在ClickHouse中update和delete也是通过ALTER语句实现的,即ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE.  ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE语句触发的是mutations,所以它们被归为mutation的范畴。

这里说的是会对表的元数据进行修改的ALTER语句。ClickHouse将ReplicatedMergeTree表的元数据信息存放在zookeeper和本地文件中,ALTER语句会触发对这两个地方的元数据信息进行修改。注意,在zookeeper上,表的元数据信息会在表级别和副本级别的znodes(即下文说的zookeeper_path和replica_path)下都进行存储。

alter语句首先会修改掉zookeeper上zookeeper_path下的元数据,然后将修改后的元数据包装生成一个ALTER_METADATA类型的ReplicatedMergeTreeLogEntryData对象,并将其上传到zookeeper上,供所有副本(包括当前副本)拉取更新。详见源码

注意,这里说的”会对表的元数据进行修改的ALTER语句“ 有一些同时也会触发mutation,下文将进一步介绍。

1.3 MERGE

merge操作用于将若干个data parts合并为单个data part,ClickHouse会在后台选择需要做merge的data parts,然后创建一个MERGE_PARTS类型的ReplicatedMergeTreeLogEntryData对象,并将其上传到zookeeper上,供所有副本(包括当前副本)拉取同步。详见源码StorageReplicatedMergeTree::mergeSelectingTask() .

1.4 MUTATION

mutation是对data parts数据的修改操作,一个mutation可能包含了对很多个data parts的修改。

除了ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE,以下类型的ALTER语句也会触发mutation :

    1. DROP_COLUMN

    2. DROP_INDEX

    3. RENAME_COLUMN

    4. 非meta only的MODIFY_COLUMN

对于会触发mutation的ALTER语句,ClickHouse会生成一个ReplicatedMergeTreeMutationEntry对象,并将其上传到zookeeper,供所有副本(包括当前副本)拉取同步。

详见源码StorageReplicatedMergeTree::alter 和 AlterCommand::isRequireMutationStage.

2 ZooKeeper的结构和作用

zookeeper是ReplicateMergeTree表副本之间进行数据同步的桥梁,存储了复制表的各类元数据信息。这里介绍几个和数据同步相关的主要信息。

2.1 zookeeper_path和replica_path

一张ReplicatedMergeTree表的所有副本在zookeeper上共享一个znode路径,我们称之为zookeeper_path,这个路径在建表时指定,我们对该路径的命名规范是:

        /clickhouse/tables/{layer}-{shard}/<table_name>

其中layer和shard都是macro,各个副本节点取值可能不同。

zookeeper_path是表级别的,其下有一个replicas节点,内部包含了各个副本的元信息节点,即:

        /clickhouse/tables/{layer}-{shard}/<table_name>/replicas/<replica>

我们称副本的元信息节点为replica_path.

2.2 表的元信息(metadata, columns)

在zookeeper_path和replica_path下都保存了表的metadata和columns信息,元数据变更时会先修改zookeeper_path下的metadata和columns,然后会异步地将更新同步到replica_path下。

除此之外,replica_path下面还会有一个metadata_version用于记录当前元数据信息的版本,这个值就是zookeeper_path下的metadata节点的dataVersion.

zookeeper_path下的metadata和columns信息的更新详见源码

replica_path下的metadata, columns和metadata_version的更新见源码

2.3 同步日志和副本队列(log, queue)

2.3.1 同步日志(log)

log位于zookeeper_path下面,用于保存所有副本间的数据同步日志(称为log entry),ClickHouse支持的LogEntry类型有:

Supported LogEntry type

从各个副本触发的数据变更操作都会上传对应的log entry到log节点下,比如:

        1. 在一个副本上执行alter table ... add column操作会上传一个ALTER_METADATA类型的log entry到log节点下。

        2. 在一个副本上执行insert into ...操作会上传一个GET_PART类型的log entry到log节点下。

zookeeper_path/log下的log entry在zookeeper上的znode命名规范是log-seqNum, 其中seqNum为创建sequential znode生成的自增序号,如log-0000000001。

2.3.2 副本队列 (queue)

queue位于replica_path下面,保存了当前副本需要同步的log entries. queue里面的log entries是从log中拉取的(由下文说的queue updating task负责拉取),被所有副本都拉取过的log entry就可以从log中删除掉, 删除操作由后台清理线程执行

replica_path下还有一个log_pointer节点,保存了当前副本从log拉取到的最大log entry id + 1,即下一个需要拉取的log entry的id.

replica_path/queue下的entry在zookeeper上的znode命名规范是queue-seqNum, 其中seqNum为创建sequential znode生成的自增序号,如queue-0000000001。

2.4 数据变更(mutations)

在zookeeper_path下面有一个mutations节点,ALTER操作触发的mutation操作都会先封装成mutation entry 后被上传到这个节点。mutation entry保存了mutation操作的相关信息,包括:

    a. source replica : 触发mutation的副本节点。

    b. mutation commands : 需要apply到data parts上的mutation指令。

    c. block numbers :这是一个partition_id -> block_number的映射,保存了每个partition的mutation version。mutation只会被apply到partition中block number小于mutation version的blocks(因为merge的关系,每个data part可能包含一个或多个blocks,并且data part包含的block numbers不一定是连续的)。

    d. alter version : mutation操作对应的metadata version,只有ALTER MODIFY/DROP 操作触发的mutation才会有非-1的值,对于ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE操作触发的mutation操作,这个值都为-1。这个值是为了将ALTER MODIFY/DROP的ALTER METADATA操作和MUTATION操作关联起来。

mutation entry的结构详见ReplicatedMergeTreeMutationEntry

一个mutation entry会被转换成若干个MUTATE_PART类型的log entry被投放到同步日志(zookeeper_path/log)中,下文介绍merge/mutation selecting task时会介绍其转换过程。

另外,在replica_path下面有一个mutation_pointer节点,用于记录副本上最后完成的mutation的id(也就是znode name).

3 负责数据同步的tasks

每个副本节点都有一系列的负责数据同步的任务,这些任务会与zookeeper交互,获取最新的同步任务,然后在本地内存中维护任务队列并借助任务池调度执行,执行完成后会更新zookeer上的对应状态。下面我们逐一介绍各类同步任务。

3.1 queue updating task

queue updating task负责当前副本上同步日志(log entry)队列的更新。

ReplicatedMergeTree表对应的StoreageReplicatedMergeTree类中有一个queue_updating_task对象,它是由后台调度池(BackgroundSchedulePool)调度执行的任务,其执行的函数是StorageReplicatedMergeTree::queueUpdatingTask().

3.1.1 工作内容

queue updating task做的事情包括:

1. 从zookeeper拉取zookeeper_path/log下的所有log entries,并从replica_path/log_pointer获取当前副本的log_pointer (即下一个需要拉取的log entry的id)。

2. 根据log_pointer在log entries中定位到需要拉取的entries,如果log_pointer为空,则从log entries中最小的entry开始拉取。

3. 将需要拉取的log entries都同步到replica_path/queue下面,这里的同步就是以选定的log entries的内容为节点内容,一个一个地在replica_path/queue下创建新的sequential znode. 注意,这里创建的znode名称并不会和zookeeper_path/log下的一致。

4. 更新replica_path/log_pointer的内容为拉取到的log entries中最大的entry id + 1.

5. 将拉取到的log entries插入到内存中的同步任务队列里,即插入到ReplicatedMergeTreeQueue::queue

6. 触发queue executing task(下文会介绍)执行。

这些步骤的实现细节请见源码ReplicatedMergeTreeQueue::pullLogsToQueue.

3.1.2 触发时机

queue updating task被触发执行的时机有:

1. 每张ReplicatedMergeTree表都有一个restarting thread,这个thread会被周期性调度执行,默认间隔是1分钟(由参数zookeeper_session_expiration_check_period控制)。当复制表首次被加载或者zookeeper session过期后被重新加载时,restarting thread会激活并调度queue updating task执行。

2. queue updating task每次被执行时, 会在从zookeeper_path/log获取log entries时添加一个watch callback (详见源码),这个callback会调度执行queue updating task, 所以每当zookeeper_path/log下有新的log entry生成时,queue updating task就会被调度执行

3. 如果queue updating task在拉取和更新队列时发生异常,在异常处理中会再次调度执行queue updating task,调度执行的间隔时间是QUEUE_UPDATE_ERROR_SLEEP_MS(1秒)。

3.1.3 为什么既要拉取到replica_path/queue又要拉取到本地内存中?

1. 拉取到replica_path/queue是为了方便记录每个副本有哪些是待执行的entries,执行完成的直接从replica_path/queue里删除即可。同时,当某个副本重启或者某张ReplicatedMergeTree表被detach后再attach时,ClickHouse可以从replica_path/queue快速加载待执行的entries (详见源码ReplicatedMergeTreeQueue::load)。

2. 拉取到本地内存相当于做了一个缓存,避免每次遍历待执行entries都需要从zookeeper拉取,提升了性能,也可以降低zookeeper的负载。

3.2 queue executing task

queue updating task把同步日志(log entries)拉取到本地内存后,我们需要一个任务去执行这些log entries. 这个任务就是queue executing task. 

queue executing task在StoreageReplicatedMergeTree类中被实现为一个通过BackgroundProcessingPool调度执行的task,即 StorageReplicatedMergeTree::queue_task_handle.

queue_task_handle执行的函数是StorageReplicatedMergeTree::queueTask.

下面介绍queueTask函数的主要执行步骤。

3.2.1 选择要处理的log entry

从ReplicatedMergeTreeQueue::queue中选择一个log entry, 选择逻辑如下(实现细节详见ReplicatedMergeTreeQueue::shouldExecuteLogEntry):

1. 对于GET_PART类型的log entry,如果它生成的data part被某个正在执行的log entry的resulting data parts所包含,则当前entry此轮不被选择。

2. 对于MERGE_PARTS类型的log entry:

    2.1 如果它生成的data part被某个正在执行的log entry的resulting data parts所包含,则当前entry此轮不被选择。

    2.2 如果它的source data parts中存在某个data part还处于被生成过程中,则当前entry此轮不被选择。

    2.3 如果它是TTL类型merge(包括TTL_DELETE, TTL_RECOMPRESS,详见MergeType)且在运行的TTL类型merges的个数(全局值,不是针对当前表的)>= max_number_of_merges_with_ttl_in_pool (配置),则当前entry此轮不被选择。

    2.4 如果source data parts的总数据量大于max_source_parts_size(max_source_parts_size是运行时确定的变量,确定逻辑详见源码)且background pool中没有足够的free threads做large merges (如果有足够free threads则不考虑source data parts的数据量是否超过max_source_parts_size,详见源码1源码2),则当前entry此轮不被选择。

3. 对于MUTATE_PART类型的log entry:

    3.1 如果它生成的data part被某个正在执行的log entry的resulting data parts所包含,则当前entry此轮不被选择。

    3.2 如果它的source data parts中存在某个data part还处于被生成过程中,则当前entry此轮不被选择。

    3.3 如果source data parts的总数据量大于max_source_parts_size(max_source_parts_size是运行时确定的变量,确定逻辑详见源码),则当前entry此轮不被选择。

    3.4 如果它是某个alter modify/drop query的一部分,则需要按alter语句的发生顺序依次执行,如果存在比它早的alter语句尚未执行完成,则当前entry此轮不被选择。详见源码

4. 对于ALTER_METADATA类型的log entry, 需要按照alter语句的发生顺序依次执行,如果存在比它早的alter语句尚未执行完成,则当前entry此轮不被选择。详见源码

5. 对于DROP_RANGE 和 REPLACE_RANGE类型的log entry,因为它们会等待生成指定范围内的data parts的log entries执行完成,为了避免死锁,如果已经有DROP_RANGE 和 REPLACE_RANGE类型的log entry正在执行,则当前entry此轮不被选择。详见源码

6. 对于MERGE_PARTS 和 MUTATE_PART类型的log entry,如果merge/mutation/TTL类型merge被cancel了(如果被cancel则后续不能提交对应类别的操作,在运行的该类操作也会抛异常),则对应操作的log entry也不会被选中执行。详见源码1源码2.

如果上述条件都不满足,则该log entry被选中执行。每次queue executing task执行只会选择一个log entry进行处理

3.2.2 处理选中的log entry

选中log entry后,StorageReplicatedMergeTree::queueTask函数会调用ReplicatedMergeTreeQueue::processEntry对log entry进行处理。processEntry函数的第三个参数是用于执行log entry的函数,此处传入的是StorageReplicatedMergeTree::executeLogEntry.

3.2.2.1 execute log entry

log entry的处理逻辑如下:

1. 对于DROP_RANGE和REPLACE_RANGE类型的log entry, 调用对应的StorageReplicatedMergeTree::executeDropRange 和 StorageReplicatedMergeTree::executeReplaceRange函数进行处理,这里对executeDropRange的执行过程做简要介绍(另一个读者可自行分析源码):

    1.1 从zookeeper上的replica_path/queue节点和内存中移出所有会生成对应range(这里的range是指定partition内指定的block number的范围)中的data part的log entries (这里会删除zookeeper上replica_path/queue节点下的entry),如果有正在运行中的,则等待其运行完成。详见源码ReplicatedMergeTreeQueue::removePartProducingOpsInRange

    1.2 移除当前working set(即MergeTreeData::data_parts_by_info)中属于对应range的data parts(详见MergeTreeData::removePartsInRangeFromWorkingSet)。注意,这里并没有从磁盘删除这些data parts,只是修改了内存中的一些状态,比如,将data part的state改成IMergeTreeDataPart::State::Outdated(详见MergeTreeData::removePartsFromWorkingSet),真正从磁盘删除data parts的操作是通过唤醒cleanup线程完成的。

    1.3 从zookeeper上的replica_path/parts节点中删除在1.2中被移除的data parts,这里如果删除失败则会重试,默认重试5次。

    1.4 唤醒cleanup线程,从磁盘清理掉已被移除的data parts。

2. 对于GET_PART类型的log entry,处理逻辑如下:

    2.1 判断目标data part在当前副本是否已经存在或被已存在的data part所包含,这里不仅会检查active data parts,也会检查处于MergeTreeDataPartState::PreCommitted状态的data parts,详见源码

    2.2 判断目标data part在zookeeper的replica_path/parts节点下是否已经存在。

    2.3 如果2.1和2.2的判断结果都是已存在,则直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功)。

    2.4 否则,进一步判断目标data part是否是某次失败的write with quorum操作的resulting data part,判断方法是查看zookeeper上zookeeper_path/quorum/failed_parts节点下是否存在目标data part。如果存在,则和2.3一样,直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功)。

    2.5 否则,从其他有目标data part的副本节点去拉取目标data part。拉取是调用StorageReplicatedMergeTree::executeFetch实现的。具体拉取过程暂不剖析,后续data fetcher相关文章中再介绍。

3. 对于MERGE_PARTS类型的log entry,处理逻辑如下:

    3.1 同GET_PART的2.1 ~ 2.4操作。

    3.2 调用StorageReplicatedMergeTree::tryExecuteMerge函数,该函数会根据各种情况/状态及相关配置而决定是执行该merge操作还是建议从其他副本节点拉取目标data part。具体merge逻辑暂不剖析,后续merge相关文章中再介绍。

    3.3 如果tryExecuteMerge返回false,即建议从其他副本拉取data part,则调用StorageReplicatedMergeTree::executeFetch进行拉取。

4. 对于MUTATE_PART类型的log entry,处理逻辑如下:

    4.1 同GET_PART的2.1 ~ 2.4操作。

    4.2 调用StorageReplicatedMergeTree::tryExecutePartMutation函数,和StorageReplicatedMergeTree::tryExecuteMerge一样,该函数也会根据各种情况/状态及相关配置而决定是执行该mutate操作还是建议从其他副本节点拉取目标data part。具体mutate过程此处不做剖析,读者可自行分析源码。

    4.3 如果tryExecutePartMutation返回false,即建议从其他副本拉取data part,则调用StorageReplicatedMergeTree::executeFetch进行拉取。

5. 对于ALTER_METADATA类型的log entry, 调用StorageReplicatedMergeTree::executeMetadataAlter函数进行处理,处理逻辑如下:

    5.1 从该log entry中解析出要更新的metadata和columns信息。

    5.2 更新zookeeper上的replica_path/metadatareplica_path/columns节点的内容为5.1中的metadata和columns信息。

    5.3 更新当前副本本地的元信息(对于ordinary database下的表,会修改本地保存的sql文件),详见StorageReplicatedMergeTree::setTableStructure

3.2.2.2 remove processed entry

executeLogEntry执行完成后会返回一个bool值,表示是否执行成功。如果执行成功,则processEntry会调用ReplicatedMergeTreeQueue::removeProcessedEntry从内存中的队列和zookeeper上的replica_path/queue节点中移除刚执行完成的log entry。

3.3 mutations updating task

mutations updating task做的事情是:基于zookeeper上的zookeeper_path/mutations节点更新内存中的mutation状态数据。这里说的状态数据主要是mutations_by_znode 和 mutations_by_partition

其中,mutations_by_znode保存了mutation id(也就是zookeeper_path/mutations下的节点名称)到MutationStatus对象的映射,MutationStatus中保存了单个mutation的状态信息,包括entry, parts_to_do, is_done等。

mutations_by_partition的类型是std::unordered_map<String, std::map<Int64, MutationStatus *>>, 保存的是Partition -> (block_number -> MutationStatus)的双层映射关系,这样我们通过partition id和data part包含的block number就可以找到某个data part需要执行的mutation操作。

注意,mutations_by_partition中的第二层key(ie. block_number)就是mutation version,每个data part的MergeTreePartInfo::getDataVersion(如果data part被mutate过或者包含mutated part,则返回的是对应的mutation version,否则返回改data part中最小的block number)会和mutation version做比较,如果小于这个version,则说明data part需要执行这个mutation。

mutations updating task是一个通过BackgroundSchedulePool调度执行的task,即StorageReplicatedMergeTree::mutations_updating_task。mutations_updating_task执行的函数是StorageReplicatedMergeTree::mutationsUpdatingTask,其主要工作是通过调用ReplicatedMergeTreeQueue::updateMutations函数完成的。

下面我们看看mutations updating task的主要工作内容有哪些。

3.3.1 工作内容

1. 从zookeeper_path/mutations节点拉取得到所有的mutation entries的id (即对应的znode name)。

2. 从mutations_by_znode和mutations_by_partition中移除掉zookeeper_path/mutations中不存在的mutation,出现这种mutation的原因可能是:

    2.1 被KILL MUTATION语句杀掉的。

    2.2 已经指向完成后被mutations finalizing task (下文会介绍)移除掉的。

3. 将zookeeper_path/mutations中新的mutations(即在mutations_by_znode和mutations_by_partition中不存在的)加入到mutations_by_znode和mutations_by_partition中。

4. 对每个新增的mutation,填充其parts_to_do (即需要mutate的data parts),填充的步骤如下:

    4.1 遍历mutation entry中的block_numbers,对于其中的每个partition, 从当前副本当前表的现存data parts(ReplicatedMergeTreeQueue::current_parts)中找到属于这个partition并且MergeTreePartInfo::getDataVersion小于对应mutation version的data parts(详见getPartNamesToMutate),将这些data parts添加到parts_to_do中。

    4.2 遍历内存中的同步任务队列(ReplicatedMergeTreeQueue::queue)中的每个log entry,在每个log entry的resulting data parts中找到partition存在于mutation entry的block_numbers中且MergeTreePartInfo::getDataVersion小于对应mutation version的data parts,将这些data parts添加到parts_to_do中。

5. 如果步骤3中获取到的新的mutations不为空,则调度merge/mutation selecting task(下文会介绍)执行。

6. 如果在步骤4中发现某些新增的mutation的parts_to_do为空(表名这个mutation可能可以终止了),则调度mutations finalizing task执行。

3.3.2 触发时机

mutations updating task被触发执行的时机和queue updating task基本一致:

1. 和queue updating task一样,当复制表首次被加载或者zookeeper session过期后被重新加载时,restarting thread会激活并调度mutations updating task执行。

2. mutations updating task每次被执行时, 会在从zookeeper_path/mutations获取mutation entries时添加一个watch callback (详见源码),这个callback会调度执行mutations updating task, 所以每当zookeeper_path/mutations下有新的mutations entry生成时,mutations updating task就会被调度执行

3. 如果mutations updating task在拉取和更新mutations时发生异常,在异常处理中会再次调度执行mutations updating task,调度执行的间隔时间是QUEUE_UPDATE_ERROR_SLEEP_MS(1秒)。

3.4 merge/mutation selecting task

上文中曾提到,zookeeper_path/muations中的一个mutation entry会被转换成多个MUTATE_PART类型的log entry并投放到zookeeper_path/log中。这个转换工作是由merge/mutation select task完成的。顾名思义,这个task会负责merge和mutation任务的选择和提交。

merge/mutation selecting task是一个通过BackgroundSchedulePool调度执行的task,即StorageReplicatedMergeTree::merge_selecting_task。注意,这里的命名虽然是merge_selecting_task, 但该task涵盖了merge和mutation任务的选择和提交。

merge_selecting_task执行的函数是StorageRreplicatedMergeTree::mergeSelectingTask,我们看看merge/mutate selecting task做了哪些工作。

3.4.1 工作内容

1. 统计同步任务队列(ReplicatedMergeTreeQueue::queue)中MERGE_PARTS和MUTATE_PART类型的log entry的数量,详见源码

2. 如果步骤1中统计得到的merge + mutation的任务总数大于或等于max_replicated_merges_in_queue(配置),则ClickHouse认为队列中merge和mutation任务已经够多了,不会选择和提交新的merge/mutation任务,跳至步骤5,否则继续执行步骤3。

3. 根据配置和一些runtime状态(比如,磁盘空闲空间)计算出max_source_parts_size_for_merge 和 max_source_part_size_for_mutation,即最大可合并的source parts大小 和 最大可mutate的source part大小。这两个值的计算逻辑见MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge 和 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation。相关配置有:

        a. max_bytes_to_merge_at_min_space_in_pool

        b. max_bytes_to_merge_at_max_space_in_pool

        c. number_of_free_entries_in_pool_to_lower_max_size_of_merge

        d. number_of_free_entries_in_pool_to_execute_mutation

4. 选择是提交MERGE_PARTS任务,还是MUTATE_PART任务,或者都不执行:

    4.1 如果max_source_parts_size_for_merge为0(即没空间做merge了),或者没有需要merge且可以merge的data parts(选取过程详见MergeTreeDataMergerMutator::selectPartsToMerge),则不提交MERGE_PARTS任务,跳至步骤4.3,否则继续执行步骤4.2。

    4.2 调用StorageReplicatedMergeTree::createLogEntryToMergeParts构建并提交一个MERGE_PARTS log entry到zookeeper_path/log中。然后跳至步骤5,否则继续执行步骤4.3。

    4.3 如果满足以下条件之一,则不提交MUTATE_PART,跳至步骤5,否则继续执行步骤4.4。

        a. max_source_part_size_for_mutation为0。 

        b. mutations_by_partition的size为0,即没有需要执行的mutation操作。

        c. 同步任务队列中的MUTATE_PART任务数大于等于max_replicated_mutations_in_queue(配置)。

    4.4 遍历当前表的committed data parts(MergeTreeData::getDataPartsVector),选择一个满足以下全部条件的data part 。如果没有满足条件的data part则跳至步骤5,否则继续执行步骤4.5。

        a. size on disk <= max_source_part_size_for_mutation

        b. 在mutations_by_partition中存在data part所在partition的mutation操作,并且对应的mutation version高于data part当前的mutation version。详见ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion和 ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl

    4.5 在步骤4.4中选中一个data part后,调用StorageReplicatedMergeTree::createLogEntryToMutatePart构建并提交一个MUTATE_PART log entry到zookeeper_path/log中。

5. 如果上述步骤没有成功提交log entry,则调度merge_selecting_task在MERGE_SELECTING_SLEEP_MS(5秒)后执行。如果成功提交了,则立刻调度merge_selecting_task执行。

说明:

1. 每次merge_selecting_task的调用执行,只会提交一个MERGE_PARTS或MUTATE_PART任务到zookeeper_path/log中。

2. 根据MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation的实现可知,只有在空闲线程充足的情况下,才会考虑提交MUTATE_PART。这是为了留更多线程给MERGE_PARTS。而且,在步骤4中做选择时,也是先判断是否可以提交MERGE_PARTS,如果有可提交的MERE_PARTS,则不会考虑MUTATE_PART。 由此可见,在ClickHouse中,MERGE_PARTS任务的优先级是高于MUTATE_PART任务的。

3. merge/mutation selecting task只在leader副本上执行(常见所有副本都是leader的情况)。

3.4.2 触发时机

merge/mutation selecting task被触发执行的时机有:

1. 当前副本赢得leader election后,激活并调度merge/mutation selecting task执行。

2. merge/mutation selecting task在执行完成后,如果结果成功,则立刻调度merge/mutation selecting task执行,如果失败,则调度merge/mutation selecting task延迟MERGE_SELECTING_SLEEP_MS(5秒)后执行。

3. ReplicatedMergeTree表的插入操作在完成commit data part后, 会触发merge/mutation selecting task调度执行,详见源码

4. 在mutations updating task发现新的mutation并完成状态更新后,会触发merge/mutation selecting task调度执行。

5. queue executing task在成功执行MERG_PARTS log entry后,会触发merge/mutation selecting task调度执行,详见StorageReplicatedMergeTree::tryExecuteMerge

6. queue executing task在成功执行MUTATE_PART log entry后,会触发merge/mutation selecting task调度执行,详见StorageReplicatedMergeTree::tryExecutePartMutation

3.5 mutations finalizing task

merge/mutation selecting task将zookeeper_path/mutations下的mutation entry转换成一个个MUTATE_PART log entry并投放到zookeeper_path/log下后,queue executing task会执行这些MUTATE_PART任务。当一个mutation entry相关的所有MUTATE_PART任务都完成了,那么这个mutation entry就可以被结束了。

mutations finalizing task就是用来结束mutation entry的任务。mutations finalizing task是一个通过BackgroundSchedulePool调度执行的任务,即StorageReplicatedMergeTree::mutations_finalizing_task,其调用的函数是StorageReplicatedMergeTree::mutationsFinalizingTask,该函数的主要功能是通过调用ReplicatedMergeTreeQueue::tryFinalizeMutations实现的。

下面介绍一下mutations finalizing task的工作内容。

3.5.1 工作内容

1. 遍历mutations_by_znode中的所有mutations,对于is_done标志位false的mutations:

    1.1 如果znode小于等于mutation_pointer(mutation已经完成),则执行以下操作:

        a. is_done置true。

        b. 清空parts_to_do。

        c. 调用ReplicatedMergeTreeAltersSequence::finishDataAlter更新alter的状态信息(如果对应的metadata alter也完成了,则会从alter sequence移除该alter)。

    1.2 如果znode大于mutation_pointer,且parts_to_do为空(mutation可能已完成),则将该mutation放入candidates中。

2. 对于每个candidate,调用ReplicatedMergeTreeMergePredicate::isMutationFinished判断其是否已经结束,判断逻辑是:

    2.1 如果mutation涉及的blocks还有在committing阶段的,则判定mutation为unfinished。

    2.2 如果现存data parts 或者 同步任务队列(ReplicatedMergeTreeQueue::queue)的resulting data parts中还有需要执行改mutation的data parts (详见getPartNamesToMutate),则判定mutation为unfinished。

    2.3 如果2.1和2.2都不成立,则判定mutation为finished。

3. 如果存在finished candidates,则:

    3.1 更新mutation_pointer : 将zookeeper上的replica_path/mutation_pointer和内存里的ReplicatedMergeTreeQueue::mutation_pointer都设置为finished candidates中最大的znode name。

    3.2 更新每个finished candidate的内存状态(MutationStatus):

        a. is_done置true。

        b. 调用ReplicatedMergeTreeAltersSequence::finishDataAlter更新alter的状态信息。

说明:对于mutation finalizing的逻辑尚未完全理解,这里只是根据源码列出执行过程,后续会在mutation相关文章做进一步解释。

3.5.2 触发时机

1. 当复制表首次被加载或者zookeeper session过期后被重新加载时,restarting thread会激活并调度mutations finalizing task执行。

2. 在更新mutations相关状态时,如果发现some_mutations_are_probably_done(即parts_to_do为空),则调度mutations finalizing task执行。

3. 如果mutations finalizing task在finalize mutations发生异常 或者 成功地finalize了一个或多个mutations,那么,则调度mutations finalizing task延迟MUTATIONS_FINALIZING_SLEEP_MS(1秒)执行。

4. 如果mutations finalizing task顺利运行后没有finalize任何mutation,则调度mutations finalizing task延迟MUTATIONS_FINALIZING_IDLE_SLEEP_MS(5秒)执行。

4 总结

1. 复制表的数据同步包括元数据同步和内容数据同步。

2. 引发数据同步任务的操作包括INSERT,ALTER METADATA,MERGE,MUTATION等。

3. zookeeper在复制表的数据同步中起到桥梁作用,znode路径分为表级别zookeeper_path和表副本级别的replica_path, 关键节点包括log,log_pointer, queue, mutations, mutation_pointer等。

4. 各个副本节点都运行着负责数据同步的各类任务,包括queue updating task, queue executing task, mutations updating task, merge/mutation selecting task和mutations finalizing task。

上一篇下一篇

猜你喜欢

热点阅读