OLAP

如何同步mysql数据到Doris中

2021-02-02  本文已影响0人  lodestar

Doris官网定义

DorisDB是由Apache Doris核心研发团队打造的新一代企业级MPP数据库。它继承了Doris项目十多年研发成果,累积了线上数千台服务器稳定运行经验,并在此基础上,对传统MPP数据库进行了开创性的革新。DorisDB重新定义了MPP分布式架构,集群可扩展至数百节点,支持PB级数据规模,是当前唯一可以在大数据规模下进行在线弹性扩展的企业级分析型数据库。DorisDB还打造了全新的向量化执行引擎,单节点每秒可处理多达100亿行数据,查询速度比其他产品快10—100倍!

mysql原始表结构

CREATE TABLE `test` (
    `id` int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
    `user_id` int(10) NOT NULL,
    `name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
    `age` int(10) NOT NULL,
    `password` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
    PRIMARY KEY (`id`),
    KEY `idx_test`(`user_id`,`name`,`age`) USING BTREE
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci

1.doris中关联mysql外表

CREATE TABLE external_rds_test(
  id BIGINT,
  user_id INT,
  name VARCHAR(20),
  age INT,
  password VARCHAR(20)
)
ENGINE=mysql
PROPERTIES
(
 "host" = "127.0.0.1",
 "port" = "3306",
 "user" = "root",
 "password" = "abcd123455",
 "database" = "db",
 "table" = "test"
);

结果如下:

MySQL [example_db]> select * from external_rds_test;
+------+---------+------+------+----------+
| id   | user_id | name | age  | password |
+------+---------+------+------+----------+
|    3 |       1 |      |    1 | ccc      |
|    4 |       0 | aa   |   11 |          |
+------+---------+------+------+----------+
2 rows in set (0.18 sec)

2.doris中关联kafka导入数据

CREATE TABLE kafka_test
(
    id BIGINT,
    user_id INT,
    age INT,
    password VARCHAR(20) DEFAULT ''
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1");

#执行作业
CREATE ROUTINE LOAD  example_db.event_load3 on kafka_test
COLUMNS TERMINATED BY ",",
COLUMNS (id,user_id,age,password)
PROPERTIES
(
    "desired_concurrent_number"="1"
)
FROM KAFKA
(
    "kafka_broker_list"= "127.0.0.1:9092",
    "kafka_topic" = "kafka_test"
);

查看作业

SHOW ROUTINE LOAD;

MySQL [example_db]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
                  Id: 10134
                Name: event_load3
          CreateTime: 2021-02-01 14:41:53
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:example_db
           TableName: kafka_test
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"id,user_id,age,password","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","maxBatchSizeBytes":"104857600","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"1","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"kafka_test","currentKafkaPartitions":"0,1,2,3,4,5,6,7,8,9,10,11","brokerList":"127.0.0.1:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":320,"errorRows":0,"committedTaskNum":12,"loadedRows":40,"loadRowsRate":0,"abortedTaskNum":2652,"totalRows":40,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":130206}
            Progress: {"0":"OFFSET_END","1":"OFFSET_END","2":"OFFSET_END","3":"OFFSET_END","4":"OFFSET_END","5":"14","6":"26","7":"3","8":"OFFSET_END","9":"OFFSET_END","10":"OFFSET_END","11":"OFFSET_END"}
ReasonOfStateChanged:
        ErrorLogUrls:
            OtherMsg:
1 row in set (0.00 sec)

State为RUNNING,表示已经成功。
停止作业

STOP ROUTINE LOAD FOR event_load3;

3.通过flink导入mysql数据到doris
方法1:通过mysql-cdc写入kafka,kafka关联doris表。

create table kafka_test (
    id BIGINT,
    user_id BIGINT,
    age BIGINT,
    password STRING,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'upsert-kafka',
    'topic' = 'kafka_test',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'key.format' = 'csv',
    'value.format' = 'csv'
)

--关联mysql
-- drop table rds_test
CREATE TABLE rds_test (
    id BIGINT,
    user_id INT,
    name STRING,
    age INT,
    password STRING
  )
  WITH (
    'connector' = 'mysql-cdc',
    'password' = 'abcd123455',
    'table-name' = 'test',
    'hostname' = '127.0.0.1',
    'username' = 'root',
    'port' = '3306',
    'database-name' = 'db'
  );
--执行Flink作业
 insert into kafka_test
  select id, user_id, age, password
    from rds_test;

方法2:通过阿里云DTS->datahub,然后通过Flink写入kafka,再关联到doris外表

create table dh_in_test (
    id BIGINT,
    user_id BIGINT,
    name STRING,
    age BIGINT,
    password STRING,
    new_dts_sync_dts_record_id STRING,
    new_dts_sync_dts_operation_flag STRING,
    new_dts_sync_dts_instance_id STRING,
    new_dts_sync_dts_db_name STRING,
    new_dts_sync_dts_table_name STRING,
    new_dts_sync_dts_utc_timestamp STRING,
    new_dts_sync_dts_before_flag STRING,
    new_dts_sync_dts_after_flag STRING
) WITH (
    'connector' = 'datahub',
    'endPoint' = 'http://dh-cn-shenzhen-int-vpc.aliyuncs.com',
    'project' = 'test',
    'topic' = 'rb_test',
    'accessId' = '***',
    'accessKey' = '***',
    'subId' = '***'
);
--执行flink作业
 insert into kafka_test
   select id, user_id, age, password
     from dh_in_test;

如何处理delete数据?对于方法1,需要手动的删除doris中的数据;对于方法2,可以通过dts_operation_flag字段来标示,dts_operation_flag可以为I/U/D,分别表示添加、更新和删除。那我们就只需要在doris表中添加一个dts_operation_flag字段来标示就可以了,查询数据的时候就不再查询等于D的值。

如何处理脏数据?delete doris中的数据,然后insert正确的值;还有个方法是将关联一个外表(这个是正确的值),然后再将doris中的表和外表中的值diff,将diff的值insert到doris中。

上一篇 下一篇

猜你喜欢

热点阅读