FlinkX大数据计算

FlinkX 如何读取和写入 Clickhouse?

2020-09-01  本文已影响0人  Flink中文社区

本文将主要介绍 FlinkX 读取和写入 Clickhouse 的过程及相关参数,核心内容将围绕以下3个问题:

1.FlinkX读写Clickhouse支持哪个版本?

2.ClickHouse读写Clickhouse有哪些参数?

3.ClickHouse读写Clickhouse参数都有哪些说明?

ClickHouse 读取

一、插件名称

名称:clickhousereader

二、支持的数据源版本

ClickHouse 19.x及以上

三、参数说明

「jdbcUrl」

「username」

「password」

「where」

「splitPk」

「fetchSize」

「queryTimeOut」

「customSql」

「column」

1.读取全部字段,如果字段数量很多,可以使用下面的写法:

"column":["*"]

2.只指定字段名称:

"column":["id","name"]

3.指定具体信息:

"column": [{
    "name": "col",
    "type": "datetime",
    "format": "yyyy-MM-dd hh:mm:ss",
    "value": "value"
}]

属性说明:

1.name:字段名称
2.type:字段类型,可以和数据库里的字段类型不一样,程序会做一次类型转换
3.format:如果字段是时间字符串,可以指定时间的格式,将字段类型转为日期格式返回
4.value:如果数据库里不存在指定的字段,则会报错。如果指定的字段存在,当指定字段的值为null时,会以此value值作为默认值返回

「polling」

「pollingInterval」

「requestAccumulatorInterval」

配置示例

1、基础配置

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

2、多通道

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

3、指定customSql

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "select id from tableTest",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

4、增量同步指定startLocation

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "increColumn": "id",
          "startLocation": "20",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

5、间隔轮询

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2,
          "polling": true,
          "pollingInterval": 3000
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

ClickHouse 写入

一、插件名称

名称:clickhousewriter

二、支持的数据源版本

ClickHouse 19.x及以上

三、参数说明

「jdbcUrl」

「username」

「password」

「column」

「preSql」

「postSql」

「table」

「writeMode」

「batchSize」

本文转载自「zhisheng」公众号,文章来源如下,感兴趣的同学可查看原文:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271

上一篇下一篇

猜你喜欢

热点阅读