flinkx的简单使用(local模式)
2020-12-03 本文已影响0人
酥苏落叶
flinkx官方github地址:https://github.com/DTStack/flinkx
下载代码
1.使用git工具把项目clone到本地
git clone https://github.com/DTStack/flinkx.git
cd flinkx
2.直接下载源码
wget https://github.com/DTStack/flinkx/archive/1.8.5.zip
unzip flinkx-1.8.5.zip
cd flink-1.8.5
编译插件
使用ide打开项目可能有部分代码会报错,找不到包,不用担心,官方给我们准备了在maven中心仓库没有的包,只需要配置好本地maven环境变量地址,然后到项目目录下找到bin文件夹,根据系统选择执行文件即可
image.png然后我们执行打包命令进行打包
maven
mvn clean package -Dmaven.test.skip=true
gradle
gradle assembleDebug
打包之后可以在项目根目录下看到这三个文件夹,其中lib下为flinkx启动jar包,plugins和syncplugins为flink的插件
image.png
编写同步任务
flinkx使用json文件进行任务配置
这里我简单的提供一个mysql之间的同步任务配置
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": 0
},
"errorLimit": {
"record": 10000,
"percentage": 100
}
},
"content": [
{
"reader": {
"parameter": {
"username": "***",
"password": "***",
"connection": [
{
"jdbcUrl": [
"jdbc:oracle:thin:@XX.XX.XX.XX:1521:newstddata"
],
"table": [
"DEVDBO.fhdzk"
]
}
],
"column": [
{
"name": "单据编号",
"type": "VARCHAR2"
},
{
"name": "单据日期",
"type": "DATE"
}
],
"customSql": "select lsh 单据编号,kdrq 单据日期 from order",
"where": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2,
"startLocation": "0",
"polling": true,
"pollingInterval": 3000
},
"name": "oraclereader"
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "***",
"password": "***",
"column": [
"order_no",
"create_time"
],
"batchSize": 1024,
"session": [],
"connection": [
{
"jdbcUrl": "jdbc:mysql://XX.XX.XX.XX:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false",
"table": [
"order"
]
}
]
}
}
}
]
}
}
注:该json只做参考,具体配置请在github中查看;
运行任务
如果只是使用flinkx local
模式,现在就可以尝试运行
java -cp lib\* com.dtstack.flinkx.launcher.Launcher -mode local -job cc.json -pluginRoot E:\project\flinkx\plugins -flinkconf F:\JavaTool\flink-1.8.3\conf -confProp "{\"flink.checkpoint.interval\":1000}"
现在就可以等待任务运行完成,中间可能会有hadoop相关的错误,这就需要我们安装hadoop,尽量安装较高版本,然后再次运行就可以正常运行了