Datax 的基本操作
1. Datax
简介:
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、
Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
开源地址:
https://github.com/alibaba/DataX
1.1 测试datax 执行环境:
执行命令
[root@bigdata001 bin]# python datax.py /opt/module/datax/job/job.json
Time 0.024s | All Task WaitReaderTime 0.044s | Percentage 100.00%
2021-10-30 21:12:38.209 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-10-30 21:12:38.210 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work.
2021-10-30 21:12:38.210 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work.
2021-10-30 21:12:38.210 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2021-10-30 21:12:38.210 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2021-10-30 21:12:38.211 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2021-10-30 21:12:38.211 [job-0] INFO JobContainer - PerfTrace not enable!
2021-10-30 21:12:38.212 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.024s | All Task WaitReaderTime 0.044s | Percentage 100.00%
2021-10-30 21:12:38.212 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-30 21:12:28
任务结束时刻 : 2021-10-30 21:12:38
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
[root@bigdata001 bin]#
2. 通过mysql 执行导入到hive 中
2.1 mysql 表
CREATE TABLE `Flink_iceberg` (
`id` bigint(64) DEFAULT NULL,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`dt` varchar(64) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
2.2 hive表
create table Flink_iceberg(
deptno int, dname string, loc string
)
partitioned by (dt string)
row format delimited fields terminated by ',';
2.3 mysqlToHive.json
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"`id`",
"`name`",
"`age`",
"`dt`"
],
"splitPk": "",
"connection": [
{
"table": [
"Flink_iceberg"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.180:3306/test"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://192.168.1.130:9000",
"fileType": "text",
"path": "/user/hive/warehouse/db_hive.db/flink_iceberg/",
"fileName": "dt",
"writeMode": "append",
"fieldDelimiter": ",",
"column": [
{
"name": "deptno",
"type": "int"
},
{
"name": "dname",
"type": "string"
},
{
"name": "loc",
"type": "string"
},
{
"name": "dt",
"type": "string"
}
]
}
}
}
]
}
}
2.4 执行命令:
python datax.py /opt/module/datax/job/mysqlToHive.json
[root@bigdata001 bin]# python datax.py /opt/module/datax/job/mysqlToHive.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2021-10-30 23:17:53.534 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-10-30 23:17:53.540 [main] INFO Engine - the machine info =>
2021-10-30 23:18:04.877 [job-0] INFO JobContainer - PerfTrace not enable!
2021-10-30 23:18:04.878 [job-0] INFO StandAloneJobContainerCommunicator - Total 17 records, 481 bytes | Speed 48B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-10-30 23:18:04.878 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-30 23:17:53
任务结束时刻 : 2021-10-30 23:18:04
任务总计耗时 : 11s
任务平均流量 : 48B/s
记录写入速度 : 1rec/s
读出记录总数 : 17
读写失败总数 : 0
3. mysql 同步hdfs
mysql 建表
CREATE TABLE `t_user` (
`id` bigint(10) NOT NULL,
`name` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*Data for the table `t_user` */
insert into `t_user`(`id`,`name`) values (1,'flink'),(2,'slave'),(3,'hive'),(4,'flink04'),(5,'hbase');
3.1 mysqlToHdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.1.180:3306/test"
],
"table": [
"t_user"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://192.168.1.130:9000",
"fieldDelimiter": " ",
"fileName": "t_user.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
3.2 执行
命令: [root@bigdata001 bin]# ./datax.py /opt/module/datax/job/mysqlToHdfs.json
e [hdfs://192.168.1.130:9000/t_user.txt__488a017c_0f35_44fa_8a2c_f38dbb92c02a].
2021-10-30 21:21:11.672 [job-0] INFO HdfsWriter$Job - start delete tmp dir [hdfs://192.168.1.130:9000/__43469efa_d0fd_4339_9656_74a085dc4321] .
2021-10-30 21:21:11.693 [job-0] INFO HdfsWriter$Job - finish delete tmp dir [hdfs://192.168.1.130:9000/__43469efa_d0fd_4339_9656_74a085dc4321] .
2021-10-30 21:21:11.693 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2021-10-30 21:21:11.693 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2021-10-30 21:21:11.694 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2021-10-30 21:21:11.797 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 1 | 1 | 1 | 0.018s | 0.018s | 0.018s
PS Scavenge | 1 | 1 | 1 | 0.010s | 0.010s | 0.010s
2021-10-30 21:21:11.798 [job-0] INFO JobContainer - PerfTrace not enable!
2021-10-30 21:21:11.798 [job-0] INFO StandAloneJobContainerCommunicator - Total 5 records, 31 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-10-30 21:21:11.799 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-30 21:21:00
任务结束时刻 : 2021-10-30 21:21:11
任务总计耗时 : 11s
任务平均流量 : 3B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0
[root@bigdata001 bin]#
3.4 查看hdfs 执行结果
[root@bigdata001 bin]# hdfs dfs -ls /
Found 8 items
-rw-r--r-- 3 root supergroup 71 2020-07-03 19:47 /dept.txt
drwxr-xr-x - root supergroup 0 2020-05-10 01:03 /flume
drwxr-xr-x - root supergroup 0 2020-07-03 12:01 /hbase
drwxr-xr-x - root supergroup 0 2020-02-15 00:45 /origin_data
-rw-r--r-- 3 root supergroup 41 2021-10-30 21:21 /t_user.txt__488a017c_0f35_44fa_8a2c_f38dbb92c02a
drwxr-xr-x - root supergroup 0 2020-04-06 19:02 /tez
drwx------ - root supergroup 0 2020-04-06 18:21 /tmp
drwxr-xr-x - root supergroup 0 2020-04-06 19:25 /user
[root@bigdata001 bin]# hdfs dfs -cat /t_user.txt__488a017c_0f35_44fa_8a2c_f38dbb92c02a
1 flink
2 slave
3 hive
4 flink04
5 hbase
[root@bigdata001 bin]#
t_user.txt__488a017c_0f35_44fa_8a2c_f38dbb92c02a
HdfsWriter 实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名
4性能优化:
job.setting.speed.channel : channel 并发数
并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)
job.setting.speed.record : 2 全局配置 channel 的 record 限速
Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速
job.setting.speed.byte: 全局配置 channel 的 byte 限速
core.transport.channel.speed.record:单个 channel 的 record 限速
core.transport.channel.speed.byte:单个 channel 的 byte 限速
5. jvm 性能优化
调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动
的时候,加上对应的参数
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
6. 可以借助开源工具
datax
DataX Web是在DataX之上开发的分布式数据同步工具,提供简单易用的 操作界面,降低用户使用DataX的学习成本,缩短任务配置时间,避免配置过程中出错。用户可通过页面选择数据源即可创建数据同步任务,支持RDBMS、Hive、HBase、ClickHouse、MongoDB等数据源,RDBMS数据源可批量创建数据同步任务,支持实时查看数据同步进度及日志并提供终止同步功能,集成并二次开发xxl-job可根据时间、自增主键增量同步数据。
任务"执行器"支持集群部署,支持执行器多节点路由策略选择,支持超时控制、失败重试、失败告警、任务依赖,执行器CPU.内存.负载的监控等等。后续还将提供更多的数据源支持、数据转换UDF、表结构同步、数据同步血缘等更为复杂的业务场景。
https://pan.baidu.com/share/init?surl=3yoqhGpD00I82K4lOYtQhg cpsk
6.1 安装
开始部署
1)解压安装包
在选定的安装目录,解压安装包
tar -zxvf datax-web-{VERSION}.tar.gz
2)执行一键安装脚本
进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行
./bin/install.sh
在交互模式下,对各个模块的package压缩包的解压以及configure配置脚本的调用,都会请求用户确认,可根据提示查看是否安装成功,如果没有安装成功,可以重复尝试; 如果不想使用交互模式,跳过确认过程,则执行以下命令安装
./bin/install.sh --force
3)数据库初始化
如果你的服务上安装有mysql命令,在执行安装脚本的过程中则会出现以下提醒:
Scan out mysql command, so begin to initalize the database
Do you want to initalize database with sql: [{INSTALL_PATH}/bin/db/datax-web.sql]? (Y/N)y
Please input the db host(default: 127.0.0.1):
Please input the db port(default: 3306):
Please input the db username(default: root):
Please input the db password(default: ):
Please input the db name(default: exchangis)
按照提示输入数据库地址,端口号,用户名,密码以及数据库名称,大部分情况下即可快速完成初始化。 如果服务上并没有安装mysql命令,则可以取用目录下/bin/db/datax-web.sql脚本去手动执行,完成后修改相关配置文件
vi ./modules/datax-admin/conf/bootstrap.properties
#Database
#DB_HOST=
#DB_PORT=
#DB_USERNAME=
#DB_PASSWORD=
#DB_DATABASE=
按照具体情况配置对应的值即可。
4) 配置
安装完成之后,
在项目目录: /modules/datax-admin/bin/env.properties 配置邮件服务(可跳过)
MAIL_USERNAME=""
MAIL_PASSWORD=""
此文件中包括一些默认配置参数,例如:server.port,具体请查看文件。
在项目目录下/modules/datax-execute/bin/env.properties 指定PYTHON_PATH的路径
vi ./modules/{module_name}/bin/env.properties
### 执行datax的python脚本地址
PYTHON_PATH=
### 保持和datax-admin服务的端口一致;默认是9527,如果没改datax-admin的端口,可以忽略
DATAX_ADMIN_PORT=
此文件中包括一些默认配置参数,例如:executor.port,json.path,data.path等,具体请查看文件。
5)启动服务
- 一键启动所有服务
./bin/start-all.sh
中途可能发生部分模块启动失败或者卡住,可以退出重复执行,如果需要改变某一模块服务端口号,则:
vi ./modules/{module_name}/bin/env.properties
找到SERVER_PORT配置项,改变它的值即可。 当然也可以单一地启动某一模块服务:
./bin/start.sh -m {module_name}
- 一键取消所有服务
./bin/stop-all.sh
当然也可以单一地停止某一模块服务:
./bin/stop.sh -m {module_name}
65.访问地址:
http://192.168.1.130:9527/index.html#/dashboard