ClickHouse——数据分片

2023-02-27  本文已影响0人  小波同学

一、分区和分片

分区

分区是表的分区,是解决大数据存储的常见解决方案,具体的DDL操作关键词是 PARTITION BY,指的是一个表按照某一列数据(比如日期)进行分区,对应到最终的结果就是不同分区的数据会写入不同的文件中。功能会比较类似于mysql的索引,主要是为了解决,有时候我们查询只关心表中的一部分数据,建表时引入partition概念,可以按照对应的分区字段,找出对应的文件进行查询展示,防止查询中会扫描整个表内容,消耗很多时间做没必要的工作。

分片

clickhouse的分片。其实也就是一个视图聚合的功能,复用了数据库的分区,相当于在原有的分区下,作为第二层分区, 是在不同节点/机器上的体现。clickhouse可以支持读取每个分片上的内容的集合。底层是通过Distributed这个引实现的。默认是随机写到某个分片,但是也可以自己去指定写到具体某台机器上。
Distributed引擎的官网介绍

分区和分片的具体关系如下:


数据分区-允许查询在指定了分区键的条件下,尽可能的少读取数据
数据分片-允许多台机器/节点同并行执行查询,实现了分布式并行计算

二、分区相关操作

2.1 创建分区表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
    INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
    INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]

2.2 删除分区

alter table [db.]table_name drop partition  key [ON CLUSTER cluster]

2.3 查询分区信息

select * from system.parts where table='table_name'

三、分片原理

在分布式模式下,ClickHouse会将数据分为多个分片,并且分布到不同节点上。不同的分片策略在应对不同的SQL Pattern时,各有优势。ClickHouse提供了丰富的sharding策略,让业务可以根据实际需求选用。

数据分片,让ClickHouse可以充分利用整个集群的大规模并行计算能力,快速返回查询结果。

更重要的是,多样化的分片功能,为业务优化打开了想象空间。比如在hash sharding的情况下,JOIN计算能够避免数据shuffle,直接在本地进行local join;支持自定义sharding,可以为不同业务和SQL Pattern定制最适合的分片策略;利用自定义sharding功能,通过设置合理的sharding expression可以解决分片间数据倾斜问题等。
另外,sharding机制使得ClickHouse可以横向线性拓展,构建大规模分布式集群,从而具备处理海量数据的能力。

不过ClickHouse的集群的水平拓展目前是一个瓶颈,因为历史数据的存在, 避免新增节点之后的数据倾斜是个难点。

四、ClickHouse数据分片

4.1 集群的配置方式

在 clickhouse 中集群配置用shard 代表分配,replica 代表副本。

<shard> <!--分片-->
  <replica> <!--副本-->
  </replica>
</shard>
<shard> <!--分片-->
  <replica> <!--副本-->
  </replica>
    <replica> <!--副本-->
  </replica>
</shard>
clickhouse 集群有两种配置方式
<yandex>
  <!-- 自定义配置名称,与 conf.xml 配置的 include 属性相同即可-->
  <clickhouse_remote_servers>
    <shard_1> <!--自定义集群名称-->
      <node> <!--自定义 clickhouse 节点-->
        <!--必填参数-->
        <host>node3</host>
        <port>9977</port>

        <!--选填参数-->
        <weight>1</weight>
        <user>default</user>
        <password></password>
        <secure></secure>
        <compression></compression>
      </node>
      <node>
        <host>node2</host>
        <port>9977</port>
      </node>
    </shard_1>
  </clickhouse_remote_servers>
</yandex>
<!-- 配置定义了一个名为 shard_1 的集群,包含了两个节点 node3、node2 -->
配置 说明
shard_1 自定义集群名称,全局唯一,是后续引用集群配置的唯一标识
node 用于定义节点,不包含副本
host clickhouse 节点服务器地址
port clickhouse 服务的tcp 端口
weight 分片权重,默认为 1
user clickhouse 用户,默认为 default
password clickhouse 的用户密码,默认为空字符
secure SSL 连接端口,默认 9440
conpression 是否要开启数据压缩功能,默认 true
<!-- 2 分片,0 副本-->
<sharding_simple> <!-- 集群自定义名称 -->
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node3</host>
      <port>9977</port>
    </replica>
  </shard>
    <shard>
    <replica>
      <host>node2</host>
      <port>9977</port>
    </replica>
  </shard>
</sharding_simple>
<!-- 1 分片,1 副本-->
<sharding_simple> <!-- 集群自定义名称 -->
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node3</host>
      <port>9977</port>
    </replica>
    <replica>
      <host>node2</host>
      <port>9977</port>
    </replica>
  </shard>
</sharding_simple>

<!-- 2 分片,1 副本-->
<sharding_simple> <!-- 集群自定义名称 -->
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node3</host>
      <port>9977</port>
    </replica>
    <replica>
      <host>node2</host>
      <port>9977</port>
    </replica>
  </shard>
  <shard> <!-- 分片 -->
    <replica> <!-- 副本 -->
      <host>node4</host>
      <port>9977</port>
    </replica>
    <replica>
      <host>node5</host>
      <port>9977</port>
    </replica>
  </shard>
</sharding_simple>
<!-- 集群部署中,副本数量的上线是 clickhouse 节点的数量决定的 -->

在 clickhouse 中给我们配置了一些示例,可以打开配置文件看一下

 <remote_servers>
        <!-- Test only shard config for testing distributed storage -->
        <test_shard_localhost>
            <!-- Inter-server per-cluster secret for Distributed queries
                 default: no secret (no authentication will be performed)

                 If set, then Distributed queries will be validated on shards, so at least:
                 - such cluster should exist on the shard,
                 - such cluster should have the same secret.

                 And also (and which is more important), the initial_user will
                 be used as current user for the query.

                 Right now the protocol is pretty simple and it only takes into account:
                 - cluster name
                 - query

                 Also it will be nice if the following will be implemented:
                 - source hostname (see interserver_http_host), but then it will depends from DNS,
                   it can use IP address instead, but then the you need to get correct on the initiator node.
                 - target hostname / ip address (same notes as for source hostname)
                 - time-based security tokens
            -->
            <!-- <secret></secret> -->

            <shard>
                <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
                <!-- <internal_replication>false</internal_replication> -->
                <!-- Optional. Shard weight when writing data. Default: 1. -->
                <!-- <weight>1</weight> -->
                <replica>
                    <host>localhost</host>
                    <port>9000</port>
                    <!-- Optional. Priority of the replica for load_balancing. Default: 1 (less value has more priority). -->
                    <!-- <priority>1</priority> -->
                </replica>
            </shard>
        </test_shard_localhost>
        <test_cluster_two_shards_localhost>
             <shard>
                 <replica>
                     <host>localhost</host>
                     <port>9000</port>
                 </replica>
             </shard>
             <shard>
                 <replica>
                     <host>localhost</host>
                     <port>9000</port>
                 </replica>
             </shard>
        </test_cluster_two_shards_localhost>


      <!-- 配置 2 个分配,0 副本 -->
        <test_cluster_two_shards>
            <shard>
                <replica>
                    <host>127.0.0.1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>127.0.0.2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </test_cluster_two_shards>

       <!--2 分片,0 副本-->
        <test_cluster_two_shards_internal_replication>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>127.0.0.1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>127.0.0.2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </test_cluster_two_shards_internal_replication>

       <!--1分片 0 副本,权重设为 1-->
        <test_shard_localhost_secure>
            <shard>
                <replica>
                    <host>localhost</host>
                    <port>9440</port>
                    <secure>1</secure>
                </replica>
            </shard>
        </test_shard_localhost_secure>


        <test_unavailable_shard>
            <shard>
                <replica>
                    <host>localhost</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>localhost</host>
                    <port>1</port>
                </replica>
            </shard>
        </test_unavailable_shard>

      <!-- 手动添加新的集群 -->
      <two_shard>
              <shard>
                <replica>
                    <host>node3</host>
                    <port>9977</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>node2</host>
                    <port>9977</port>
                </replica>
            </shard>
      </two_shard>


    </remote_servers>
-- 在 system.clusters 中查看配置情况
select cluster,host_name from system.clusters;

┌─cluster──────────────────────────────────────┬─host_name─┐
│ test_cluster_two_shards                      │ 127.0.0.1 │
│ test_cluster_two_shards                      │ 127.0.0.2 │
│ test_cluster_two_shards_internal_replication │ 127.0.0.1 │
│ test_cluster_two_shards_internal_replication │ 127.0.0.2 │
│ test_cluster_two_shards_localhost            │ localhost │
│ test_cluster_two_shards_localhost            │ localhost │
│ test_shard_localhost                         │ localhost │
│ test_shard_localhost_secure                  │ localhost │
│ test_unavailable_shard                       │ localhost │
│ test_unavailable_shard                       │ localhost │
└──────────────────────────────────────────────┴───────────┘
# node3
vim /etc/clickhouse-server/config.xml
# 增加如下内容
<macros>
  <shard>01</shard>
  <replica>node3</replica>
</macros>

# node2
vim /etc/clickhouse-server/config.xml
# 增加如下内容
<macros>
  <shard>02</shard>
  <replica>node2</replica>
</macros>
-- 进入 clickhouse 命令行查看变量是否配置成功

select * from system.macros;

-- 查看远端节点的数据
select * from remote('node2:9977','system','macros','default')

4.2 基于集群实现分布式 DDL

在默认情况下,创建多张副本表需要在不同服务器上进行创建,这是因为 create、drop、rename和 alter 等 ddl 语句不支持分布式执行,而在假如集群配置后,就可以使用新的语法实现分布式DDL 执行了。

create / drop / rename / alter table on cluster cluster_name

-- cluster_name 对应为配置文件中的汲取名称,clickhouse 会根据集群的配置,去各个节点执行 DDL 语句

-- 在 two_shard 集群 创建测试表

CREATE TABLE t_shard ON CLUSTER two_shard
(
    `id` UInt8,
    `name` String,
    `date` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_shard', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY id

┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node2 │ 9977 │      0 │       │                   1 │                1 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │      0 │       │                   0 │                0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

-- 表引擎可以使用其他任意引擎
-- {shard} 和 {replica} 两个动态变量代替了前面的硬编码方式
-- clickhouse 会根据 shard_2 的配置在 node3 和 node2 中创建 t_shard 数据表

-- 删除 t_shard 表
drop table t_shard on cluster shard_2;

4.2.1 数据结构

<!-- 在默认情况下,分布式 DDL 在 zookeeper 内使用的根路径由config.xml distributed_ddl 标签配置 -->

<distributed_ddl>
  <path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>

<!-- 默认为 /clickhouse/task_queue/ddl-->

在此路径之下,还有一些其他监听节点,包括 /query-[seq] 这是 DDL 操作日志,每执行一次分布式 DDL 查询,该节点下就会增加一条操作日志,记录响应操作。当各个节点监听到有新的日志假如的时候,便会响应执行。

DDL 操作日志使用 zookeeper 持久化顺序节点,每条指令的名称以 query-[seq] 为前缀,后面的序号递增,在 query-[seq] 操作日志下,还有两个状态节点:

/query-000001/finished
node3 : 0
node2 : 0

# 表示 node3,node2 两个节点已经执行完成
# 在 /query-[seq]下记录的信息由 DDLLogEntry 承载,它的核心属性有以下几个:
version: 1

query: CREATE TABLE default.t_shard UUID \'d1679b02-9eae-4766-8032-8201a2746692\' ON CLUSTER two_shard (`id` UInt8, `name` String, `date` DateTime) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/t_shard\', \'{replica}\') PARTITION BY toYYYYMM(date) ORDER BY id

hosts: ['node3:9977','node2:9977']

initiator: node3%2Exy%2Ecom:9977

# query:记录了 DDL 查询的执行语句
# host:记录了指定集群的 hosts 主机列表,集群由分布式 DDL 语句中的 on cluster 指定,在分布式 DDL 执行过程中,会根据 hosts 列表逐个判断它们的执行状态。
# initiator:记录初始 host 主机的名称,hosts 主机列表的取值来自于初始化 host 节点上的去集群

host主机列表的取值来源等同于下面的查询

SELECT host_name
FROM system.clusters
WHERE cluster = 'two_shard'

┌─host_name─┐
│ node3     │
│ node2     │
└───────────┘

4.2.2 分布式 DDL 的执行流程

以创建分布式表为例说明分布式 DDL 的执行流程。

分布式 DDL 整个流程按照从上而下的时间顺序执行,大致分成 3 个步骤:

五、Distributed 原理解析

Distributed 表引擎是分布式表的代名词,他自身不存储任何数据,而是作为数据分片的代理,能够自动路由数据至集群中的各个节点,所以 DIstributed 表引擎需要和其他表引擎一起协同工作。

从上图可以看出一张表分成了两部分:

对于分布式表与本地表之间表结构的一致性检查,Distributed 表引擎采用了读时检查的机制,这意味着如果他们的表结构不兼容,需要在查询时才会抛出异常,而在创建表引擎时不会进行检查,不同 clickhouse 节点上的本地表之间使用不同表引擎也是可行的,但是通常不会这么做,保持他们的结构一致,有利于后期的维护避免造成不可预计的后果。

5.1 定义形式

Distributed 表引擎的定义形式

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name on cluster cluster_name(
  name1 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
  name2 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
  ...
) ENGINE = Distributed(cluster,database,table,[sharding_key])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
-- 创建分布式表 t_shard_2_all 代理 two_shard 集群的 drfault.t_shard_2_local 表

CREATE TABLE t_shard_2_all ON CLUSTER two_shard
(
    `id` UInt8,
    `name` String,
    `date` DateTime
)
ENGINE = Distributed(two_shard, default, t_shard_2_local, rand())

Query id: 83e4f090-0f7d-4892-bbf3-a094f97a6eea

┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node2 │ 9977 │      0 │       │                   1 │                1 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │      0 │       │                   0 │                0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

-- 这里用的是 on cluster 分布式 DDL, 所以在 two_shard 集群中每个节点都会创建一张分布式表
-- 写入数据时会根据 rand() 随机函数的取值决定写入那个分片,
-- 当这时还没有创建 本地表,可以看出Distributed 是读数据时才会进行检查。

-- 尝试 查询 t_shard_2_all 分布式表 
SELECT *
FROM t_shard_2_all;


Received exception from server (version 21.4.3):
Code: 60. DB::Exception: Received from localhost:9977. DB::Exception: Table default.t_shard_2_local doesn t exist.


-- 使用分布式 DDL 创建本地表
CREATE TABLE t_shard_2_local ON CLUSTER two_shard
(
    `id` UInt8,
    `name` String,
    `date` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_shard_2_local', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY id

┌─host──┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ node3 │ 9977 │      0 │       │                   1 │                0 │
│ node2 │ 9977 │      0 │       │                   0 │                0 │
└───────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

-- 尝试 查询 t_shard_2_all 分布式表 
SELECT *
FROM t_shard_2_all;

Query id: 5ae82696-0f07-469b-bff5-bd17dc513da7

Ok.

0 rows in set. Elapsed: 0.009 sec.

-- 到现在为止,拥有两个数据分配的分布式表 t_shard_2_all 就创建好了

5.2 查询的分类

分布式表的查询操作可以分为以下几类:

-- 删除分布式表
drop table t_shard_2_all on cluster two_shard;

-- 删除本地表
drop table t_shard_2_local on cluster two_shard;

5.3 分片规则

关于分片的规则这里进一步说明,分片键要求返回一个整型类型的取值,包括 Int和 UInt 类型的系列

-- 分片键可以是一个具体的整型字段
-- 按照用户 ID 划分
Distributed(cluster,database,table,userid)

-- 分片键也可以是返回整型的表达式
-- 按照随机数划分
Distributed(cluster,database,table,rand())

-- 按照用户 ID 的散列值划分
Distributed(cluster,database,table,intHash64(userid))

如果不声明分片键,那么分布式表只能包含一个分片,这意味着只能映射一张表,否则写入数据时将抛出异常。当一个分布式表只包含一个分片的时候也就失去了分布式的意义,所以通常会按照业务需要设置分片键。

5.3.1 分片权重(weight)

在配置集群时,有一项 weight 的设置1

weight 默认为 1,它可以被设置成任意整除,但是建议将其设置为比较小的值。分片权重会影响分片中的数据倾斜程度,分片权重越大,写入的数据就会越多。

5.3.2 槽(slot)

slot 的数量等于所有分片权重之和,假设集群有两个分片,第一个分片 weight 为 10,第二个 weight 为20 ,那么 slot 的数量为 30(10+20),slot 按照权重元素的取值区间,与对应的分片形成映射关系,

5.3.3 选择函数

选择函数用于判断一行待写入的数据应该被写到哪个分片中,判断过程大致分成两个步骤:

5.4 分布式写入的核心流程

向集群内的分片写入数据时,通常有两种思路,

为了便于理解,这里将分片写入和副本复制拆分成两个部分讲解,使用一个拥有 2 个分片 0 个副本的集群讲解分片写入流程,使用一个拥有 1 个分片 1 个副本的集群讲解分片副本复制流程。

5.4.1 将数据写入分片的流程

在对 Distributed 表执行 insert 操作的时候,会进入数据写入的执行逻辑。整个过程大约分成 5 个步骤。

将需要放到远端分片的数据以分区为单位,分别写入 t_shard_2_all 存储目录下的临时 bin 文件,数据文件命名规则如下:

/database@host:port/[increase_num].bin

# 10,40 的两条数据会写入到这个临时文件中

临时数据写完后会尝试与 第二个分片的服务器进行连接。

由 Distributed 表负责向远端分片发送数据时,有异步和同步两种模式:

由 insert_distributed_sync 参数控制使用何种模式,默认 false(异步),如果设置为 true ,还需要设置 insert_distributed_timeout 参数控制同步等待超时时间。

5.4.2 副本复制的流程

Distributed 选择 replica的算法大致是,clickhouse 服务器节点中拥有一个全局计数器 errors_count,当服务器出现异常时计数器 +1,当一个分片有多个副本时,选择 errors_count 计数最小的服务器,进行数据写入。

5.5 分布式查询的核心流程

与写入数据有所不同,面向集群查询数据的时候,只能通过Distributed 表引擎实现,当Distributed 表执行查询操作的时候,会依次查询每个分片的数据,然后再汇总返回。

5.5.1 多副本路由选择

在查询数据的时候,如果一个集群中有一个分片有多个副本,那么 Distributed 需要面临副本选择的问题,clickhouse 会使用负载均衡算法从众多副本中选择一个,而具体使用哪种算法由load_balancing参数控制。

# clickhouse 提供四种负载均衡算法
load_balancing=random/nearest_hostname/in_order/first_or_random

5.5.2 多分片查询的核心流程

分布式查询与分布式写入类似,同样是谁发起谁负责,它会由接收 select 查询的 Distributed 表,负责串联起整个查询。

首先针对分布式表的查询SQL,按照分片数量将查询根据分片拆分成若干个针对本地表查询的子查询,然后向各个表发起查询,最后再汇总各个分片的结果。

-- 例如在分布式表执行下面查询,查看执行计划

EXPLAIN
SELECT count(1)
FROM t_shard_2_all;

┌─explain─────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY))                                 │
│   MergingAggregated                                                         │
│     SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│       Union                                                                 │
│         Expression (Convert block structure for query from local replica)   │
│           ReadFromPreparedSource (Optimized trivial count)                  │
│         ReadFromPreparedSource (Read from remote replica)                   │
└─────────────────────────────────────────────────────────────────────────────┘

整个执行计划从上至下可分成两个步骤

5.5.3 使用 global 优化分布式子查询

如果在分布式查询中使用子查询,可能会面临两难的局面,

先看下面一个示例:

-- 使用分布式 DDL 创建 分布式表
CREATE TABLE t_distributed_query_all ON CLUSTER two_shard
(
    `id` UInt8, -- 用户编号
    `repo` UInt8 -- 仓库编号
)
ENGINE = Distributed(two_shard, default, t_distributed_query_local, rand());

-- 使用分布式 DDL 创建本地表
CREATE TABLE t_distributed_query_local ON CLUSTER two_shard
(
    `id` UInt8,
    `repo` UInt8
)
ENGINE = TinyLog;

-- 在 node2 节点写入数据
insert into t_distributed_query_local values (1,100),(2,100),(3,100);
-- 查询数据
select * from t_distributed_query_local;
┌─id─┬─repo─┐
│  1 │  100 │
│  2 │  100 │
│  3 │  100 │
└────┴──────┘

-- 在 node3 节点写入数据
insert into t_distributed_query_local values (3,200),(4,200);
-- 查询数据
select * from t_distributed_query_local;
┌─id─┬─repo─┐
│  3 │  200 │
│  4 │  200 │
└────┴──────┘

-- 查询全局表数据
select * from t_distributed_query_all;

┌─id─┬─repo─┐
│  1 │  100 │
│  2 │  100 │
│  3 │  100 │
└────┴──────┘
┌─id─┬─repo─┐
│  3 │  200 │
│  4 │  200 │
└────┴──────┘

要求找到同时拥有两个仓库的用户,对于这种查询可以使用 in 查询子句,与此同时面临的问题是 in 查询使用分布式表还是本地表?

SELECT uniq(id)
FROM t_distributed_query_all
WHERE (repo = 100) AND (
  id IN
    (
      SELECT id
      FROM t_distributed_query_local
      WHERE repo = 200
    )
);

┌─uniq(id)─┐
│        0 │
└──────────┘
-- 并没有查询出结果

-- 在分布式表在接收到查询后,将上面 SQL 替换成本地表的形式再发送到每个分片进行执行

SELECT uniq(id)
FROM t_distributed_query_local
WHERE (repo = 100) AND (
  id IN
    (
      SELECT id
      FROM t_distributed_query_local
      WHERE repo = 200
    )
);
-- 单独在分片 1 或分片 2 都无法找到满足 同时等于 100 和 200 的数据
SELECT uniq(id)
FROM t_distributed_query_all
WHERE (repo = 100) AND (id GLOBAL IN
(
    SELECT id
    FROM t_distributed_query_all
    WHERE repo = 200
))

Query id: 0a55d59d-c87b-4bc8-8985-dad26f0a39b9

┌─uniq(id)─┐
│        1 │
└──────────┘

Global 查询流程:

在使用 global 修饰符之后,clickhouse 使用内存表临时保存了 in 子查询到的数据,并将其发送到远端分片节点,以此达到了数据共享的目的,从而避免了查询放大的问题,in 或者 join 子句返回的数据不宜过大,如果内存表存在重复数据,可以实现在子句中增加 distinct 实现去重。

参考:
https://pushkin.blog.csdn.net/article/details/127020068

https://blog.csdn.net/aizhupo1314/article/details/120016988

上一篇下一篇

猜你喜欢

热点阅读