Mriya使用Flink构建实时Greenplum

2020-07-02  本文已影响0人  codingdm

mriya (运输机) :airplane:

介绍

使用Flink开发的实时ETL,数据从MySQL到Greenplum。使用canal解析MySQL的binlog,投放进kafka,使用Flink消费kafka并把数据组装进Greenplum,后续将会添加更多的数据源和目标源。

了解mriya实时动态欢迎来github点个star image

https://github.com/JeasonPeople/mriya

工作流程

工作流程
  1. 利用canal解析MySQL的binary log,并将解析的log投入kafka中。
  2. 使用mriya消费kafka中的消息,还原MySQL 的增删改。
  3. 将MySQL的增删改转义成目标源的增删改语句

MySql --> PostGreSql/Greenplum(使用delete+copy方式):

  1. 支持近实时级别的数据增删改

  2. 支持自动创建表

CREATE TABLE [IF NOT EXISTS] tbl_name create_definition: {...} 
  1. 支持MySql表结构的变更
ALTER TABLE tbl_name

  | ADD [COLUMN] col_name column_definition
  
  | ADD [COLUMN] (col_name column_definition,...) 
  
  | DROP [COLUMN] col_name 
  
  | MODIFY [COLUMN] col_name column_definition
  
  1. 支持主键的修改

  2. 删除表

  3. 修改表名

MySql --> Apache Kudu(待开发):

工作原理

  1. 从kafka中获取canal解析完成的MySQLBinary log。
  2. 使用Flink的keyBy对targetTable进行分组,并使用时间窗口。
  3. 自定义一个trigger,触发事件为解析到DDL语句。
  4. 步骤2和步骤3组成,时间窗口+自定义trigger组合使用,如果没有DDL语句则根据时间进行滚动,如果存在DDL语句数据立即滚动。
  5. 定义aggregate,将同一张表的数据进行合并去重
  6. 自定义Sink,定义GreenplumSink或者其他目标数据源。

docker 极速体验

git clone https://github.com/JeasonPeople/mriya.git
cd docker-compose
docker-compose up
  1. 访问http://docker-ip:8848/nacos修改配置(默认账号nacos/nacos)
    在public下新增Properties文件, Data ID=MRIYA, group=MRIYA_GROUP
mriya.source.kafka.bootstrap.servers=kafka:9092
mriya.source.kafka.zookeeper.connect=zk:2181
mriya.source.kafka.group.id=dw-etl-prod-gp6
mriya.source.kafka.auto.offset.reset=earliest
mriya.source.kafka.topic=mriya

mriya.target.datasource.type=greenplum
mriya.target.datasource.url=jdbc:postgresql://greenplum:5432/gsdw?serverTimezone=GMT+8
mriya.target.datasource.schema=dw_ods
mriya.target.datasource.username=gpadmin
mriya.target.datasource.password=pivotal
# 支持freemarker语法,${table}为必写项
mriya.table.name.template=${topic}_${database}_${table}

# psql -d template1 -c "alter user gpadmin password 'pivotal'"
# mriya.message.filer=${topic}-${database}-${table}
# mriya.message.filer=mes-accounting_bak-*
  1. 使用gpadmin账号连接greenplum创建database以及schema(默认账号root/pivotal gpadmin/pivotal)
CREATE DATABASE "mriya";
CREATE SCHEMA "dw_ods";
  1. 访问http://docker-ip:8081/#/submit提交jar并运行jar

  2. 使用连接工具连接MySql(默认账号root/Mriya@Mriya)运行sql

CREATE DATABASE `mriya`;
CREATE TABLE `mriya`.`table_1`  (
  `k1` int(10) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
  `c1` varchar(255) NULL,
  `c2` varchar(255) NULL,
  `c3` varchar(255) NULL,
  `c4` datetime(2) NULL,
  PRIMARY KEY (`k1`)
);

安装教程

  1. 安装MySql
  2. 安装canal
  3. 安装kafka
  4. 安装zookeeper

1-4 安装教程(https://github.com/alibaba/canal/wiki)

  1. 安装配置中心nacos

nacos 安装教程(https://nacos.io/zh-cn/docs/deployment.html)

  1. 安装Flink

单机版安装(https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html#starting-flink)

  1. 安装Greenplum

docker安装Greenplum

docker pull datagrip/greenplum
docker run -it -p 5432:5432 datagrip/greenplum

用户名: gpadmin 密码: pivotal
用户名: root 密码: pivotal

使用说明

  1. 使用源码编译
git clone https://github.com/JeasonPeople/mriya.git
cd mriya
mvn install -Dmaven.test.skip=true
cd mriya-flink/target

将打包好的jar包通过Flink Web上传并执行

同步速度

同步速度
同步速度
上一篇下一篇

猜你喜欢

热点阅读