FlinkX

flinkx的简单使用(local模式)

2020-12-03  本文已影响0人  酥苏落叶

flinkx官方github地址:https://github.com/DTStack/flinkx

下载代码

1.使用git工具把项目clone到本地
git clone https://github.com/DTStack/flinkx.git
cd flinkx
2.直接下载源码
wget https://github.com/DTStack/flinkx/archive/1.8.5.zip
unzip flinkx-1.8.5.zip
cd flink-1.8.5

编译插件

使用ide打开项目可能有部分代码会报错,找不到包,不用担心,官方给我们准备了在maven中心仓库没有的包,只需要配置好本地maven环境变量地址,然后到项目目录下找到bin文件夹,根据系统选择执行文件即可

image.png
然后我们执行打包命令进行打包
maven
  mvn clean package -Dmaven.test.skip=true

gradle

  gradle assembleDebug

打包之后可以在项目根目录下看到这三个文件夹,其中lib下为flinkx启动jar包,plugins和syncplugins为flink的插件


image.png

编写同步任务

flinkx使用json文件进行任务配置
这里我简单的提供一个mysql之间的同步任务配置

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 10000,
        "percentage": 100
      }
    },
    "content": [
      {
        "reader": {
          "parameter": {
            "username": "***",
            "password": "***",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:oracle:thin:@XX.XX.XX.XX:1521:newstddata"
                ],
                "table": [
                  "DEVDBO.fhdzk"
                ]
              }
            ],
            "column": [
              {
                "name": "单据编号",
                "type": "VARCHAR2"
              },
              {
                "name": "单据日期",
                "type": "DATE"
              }
            ],
            "customSql": "select lsh 单据编号,kdrq 单据日期 from order",
            "where": "",
            "queryTimeOut": 1000,
            "requestAccumulatorInterval": 2,
            "startLocation": "0",
            "polling": true,
            "pollingInterval": 3000
          },
          "name": "oraclereader"
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "***",
            "password": "***",
            "column": [
              "order_no",
              "create_time"
            ],
            "batchSize": 1024,
            "session": [],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://XX.XX.XX.XX:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false",
                "table": [
                  "order"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

注:该json只做参考,具体配置请在github中查看;

运行任务

如果只是使用flinkx local 模式,现在就可以尝试运行

java -cp lib\* com.dtstack.flinkx.launcher.Launcher -mode local -job cc.json -pluginRoot E:\project\flinkx\plugins -flinkconf F:\JavaTool\flink-1.8.3\conf -confProp "{\"flink.checkpoint.interval\":1000}"

现在就可以等待任务运行完成,中间可能会有hadoop相关的错误,这就需要我们安装hadoop,尽量安装较高版本,然后再次运行就可以正常运行了

上一篇 下一篇

猜你喜欢

热点阅读