SIDDHI(西迪) CDC配置

2021-11-15  本文已影响0人  啊布多

CDC介绍

关于CDC的介绍可参考 cdc(程序CDC类)_百度百科 (baidu.com),CDC 是change data capture,即变化数据捕捉。是数据库进行备份的一种方式,常用于大量数据的备份工作。分为入侵式的和非入侵式的备份方法,入侵式的有基于触发器备份、基于时间戳备份、基于快照备份,非入侵式的备份方法是基于日志的备份。mysql 基于日志的CDC就是要开启mysql binary log。

SIDDHI 关于CDC的配置参数

SIDDHI支持“listening”和“polling”两种模式

listening支持 'INSERT', 'UPDATE'和'DELETE'三种模式,可通过operation参数进行配置。只支持MYSQL数据库

可使用的参数有:
url:数据库连接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
mode:CDC模式(listening)
username:数据库用户名,要有'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE'和'REPLICATION CLIENT' 权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'user'@'%';
password:数据库用户密码
table.name:要监听的表名
operation:支持 'INSERT', 'UPDATE'和'DELETE'
pool.properties:数据库连接的池参数可以指定为键值对。
connector.properties:在这里,您可以将Debezium连接器属性指定为逗号分隔的字符串。
database.server.id加入MySQL数据库集群读取bin日志时使用的ID。这应该是一个介于1到2^32之间的唯一整数。
database.server.name:标识数据库服务器并为其提供名称空间的逻辑名称

polling 支持insert、update ; 支持多种数据库

mode:CDC模式(polling )
datasource.name:SIDDHI配置文件内配置的数据库源
url:数据库连接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
username:数据库用户名,要有'SELECT' 权限
password:数据库用户密码
polling.column:要监听的字段
polling.interval:呆监听的记录数,默认为1
table.name:要监听的表名
wait.on.missed.record:指示进程是否需要等待丢失/无序记录。
当此标志设置为“true”时,一旦识别出丢失的记录,流程将被保留。缺少的recrod由polling.column值的顺序标识。这只能用于数字字段,不建议用于时间值,因为它不是连续的。
仅当记录可能无序写入时(例如并发写入器),才应启用此功能,因为这会降低性能。
missed.record.waiting.timeout:重试丢失/无序记录的超时时间(以秒为单位)。这应该与wait.on.missed.record参数一起使用。如果未设置参数,进程将无限期地等待丢失的记录。

数据库CDC配置

Mysql 5.7.X以后版本支持,默认不支持CDC,需要配置指定参数。8.X.X以后配置默认支持

show VARIABLES like '%log%':查看日志配置参数


image.png

server_id:数据库ID,单机模式下设置 server_id=1,集群模式下不同节点的server_id必须不同
log_bin:指定binlog文件名和储存位置,可设置为log_bin=mysql-bin
binlog_format:binlog格式。有3个值可以选择:ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。STATEMENT:记录修改数据的SQL,日志量较小。MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。binlog_format=row
expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。这里设置为7天,expire_logs_days=7
binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。单例数据库可不设置,集群模式需要配置多条

binlog_do_db=db_a
binlog_do_db=db_b

Oracle,默认是开启的。

SIDDHI驱动程序,试了很多版本,建议使用siddhi-io-cdc-2.0.3.jar ,踩了很多坑。程序默认有2.0.5,2.0.10多多少少都有问题。

mysql数据源配置实例

oracle数据源配置实例

  • name: test
    description: The datasource used for test
    jndiConfig:
    name: jdbc/test
    useJndiReference: true
    definition:
    type: RDBMS
    configuration:
    jdbcUrl: 'jdbc:oracle:thin:@192.168.1.10:1521/ORCL'
    username: XXXXX
    password: XXXXX
    driverClassName: oracle.jdbc.driver.OracleDriver
    maxPoolSize: 20
    idleTimeout: 60000
    connectionTestQuery: SELECT 1 FROM DUAL
    validationTimeout: 30000
    isAutoCommit: false

mysql程序开发实例

@App:name("test_mysql_cdc")

@App:description("Description of the plan")

-- @source(type = 'cdc' , url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
-- username = 'XXXX', password = 'XXXX',
-- table.name = 'person', operation = 'insert',
-- @map(type='keyvalue', @attributes(id = 'ID', name = 'Name', age = 'Age')))
-- define stream inputStream (id int, name string, age int);

@source(type = 'cdc', mode='polling', polling.column = 'ID',
polling.interval = '1',
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
username = 'admin', password = '123456',
table.name = 'person',
@map(type='keyvalue'))
define stream pollingStream (id string, name string, age string);

-- from inputStream#log()
-- select *
-- insert into testString;

from pollingStream#log()
select *
insert into testPolling;

oracle程序开发实例

@source(type = 'cdc',
datasource.name = 'test',
url = '',
mode = 'polling',
polling.column = 'name',
polling.interval = '1',
username = '',
password = '',
table.name = 'user',
@map(type = 'keyvalue',fail.on.missing.attribute="false" ))

上一篇下一篇

猜你喜欢

热点阅读