Doris系列18-数据导出
一. 数据导出概述
数据导出(Export)是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS/BOS 等。
名词解释:
FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。
BE:Backend,Doris 的后端节点。负责查询执行和数据存储。
Broker:Doris 可以通过 Broker 进程对远端存储进行文件操作。
Tablet:数据分片。一个表会划分成多个数据分片。
原理:
用户提交一个 Export 作业后。Doris 会统计这个作业涉及的所有 Tablet。然后对这些 Tablet 进行分组,每组生成一个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中,也可以通过S3协议直接导出到支持S3协议的远端存储上。
总体的调度方式如下:
+--------+
| Client |
+---+----+
| 1. Submit Job
|
+---v--------------------+
| FE |
| |
| +-------------------+ |
| | ExportPendingTask | |
| +-------------------+ |
| | 2. Generate Tasks
| +--------------------+ |
| | ExportExporingTask | |
| +--------------------+ |
| |
| +-----------+ | +----+ +------+ +---------+
| | QueryPlan +----------------> BE +--->Broker+---> |
| +-----------+ | +----+ +------+ | Remote |
| +-----------+ | +----+ +------+ | Storage |
| | QueryPlan +----------------> BE +--->Broker+---> |
| +-----------+ | +----+ +------+ +---------+
+------------------------+ 3. Execute Tasks
- 用户提交一个 Export 作业到 FE。
- FE 的 Export 调度器会通过两阶段来执行一个 Export 作业:
2.1) PENDING:FE 生成 ExportPendingTask,向 BE 发送 snapshot 命令,对所有涉及到的 Tablet 做一个快照。并生成多个查询计划。
2.2) EXPORTING:FE 生成 ExportExportingTask,开始执行查询计划。
查询计划拆分:
Export 作业会生成多个查询计划,每个查询计划负责扫描一部分 Tablet。每个查询计划扫描的 Tablet 个数由 FE 配置参数 export_tablet_num_per_task 指定,默认为 5。即假设一共 100 个 Tablet,则会生成 20 个查询计划。用户也可以在提交作业时,通过作业属性 tablet_num_per_task 指定这个数值。
一个作业的多个查询计划顺序执行。
查询计划执行:
一个查询计划扫描多个分片,将读取的数据以行的形式组织,每 1024 行为一个 batch,调用 Broker 写入到远端存储上。
查询计划遇到错误会整体自动重试 3 次。如果一个查询计划重试 3 次依然失败,则整个作业失败。
Doris 会首先在指定的远端存储的路径中,建立一个名为 __doris_export_tmp_12345 的临时目录(其中 12345 为作业 id)。导出的数据首先会写入这个临时目录。每个查询计划会生成一个文件,文件名示例:
export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822
其中 c69fcf2b6db5420f-a96b94c1ff8bccef 为查询计划的 query id。1561453713822 为文件生成的时间戳。
当所有数据都导出后,Doris 会将这些文件 rename 到用户指定的路径中。
二. 案例
需要同步的数据:
mysql> select * from kafka_test1;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 7 | 32 | ww | 231 |
| 10 | 12 | gg | 215 |
| 9 | 12 | ff | 213 |
| 6 | 12 | pp | 123 |
| 8 | 12 | ee | 213 |
+--------+----------+----------+------+
5 rows in set (0.36 sec)
mysql> desc kafka_test1;
+----------+-------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-------+---------+-------+
| siteid | INT | Yes | true | 10 | |
| citycode | SMALLINT | Yes | true | NULL | |
| username | VARCHAR(32) | Yes | true | | |
| pv | BIGINT | Yes | false | 0 | SUM |
+----------+-------------+------+-------+---------+-------+
4 rows in set (0.00 sec)
2.1 不指定文件
export命令:
EXPORT TABLE example_db.kafka_test1
TO "hdfs://10.31.1.123:8020/tmp/"
PROPERTIES
(
"label" = "mylabel_20211217_1",
"column_separator"=",",
"columns" = "siteid,citycode,username,pv",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER broker_name
(
"username" = "doris_user",
"password" = "abc123"
);
image.png
image.png
2.1 指定文件
export 命令:
EXPORT TABLE example_db.kafka_test1
TO "hdfs://10.31.1.123:8020/tmp/kafka_test1.txt"
PROPERTIES
(
"label" = "mylabel_20211217_2",
"column_separator"=",",
"columns" = "siteid,citycode,username,pv",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER broker_name
(
"username" = "doris_user",
"password" = "abc123"
);
image.png
image.png