SIDDHI(西迪) CDC配置
CDC介绍
关于CDC的介绍可参考 cdc(程序CDC类)_百度百科 (baidu.com),CDC 是change data capture,即变化数据捕捉。是数据库进行备份的一种方式,常用于大量数据的备份工作。分为入侵式的和非入侵式的备份方法,入侵式的有基于触发器备份、基于时间戳备份、基于快照备份,非入侵式的备份方法是基于日志的备份。mysql 基于日志的CDC就是要开启mysql binary log。
SIDDHI 关于CDC的配置参数
SIDDHI支持“listening”和“polling”两种模式
listening支持 'INSERT', 'UPDATE'和'DELETE'三种模式,可通过operation参数进行配置。只支持MYSQL数据库
可使用的参数有:
url:数据库连接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
mode:CDC模式(listening)
username:数据库用户名,要有'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE'和'REPLICATION CLIENT' 权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'user'@'%';
password:数据库用户密码
table.name:要监听的表名
operation:支持 'INSERT', 'UPDATE'和'DELETE'
pool.properties:数据库连接的池参数可以指定为键值对。
connector.properties:在这里,您可以将Debezium连接器属性指定为逗号分隔的字符串。
database.server.id加入MySQL数据库集群读取bin日志时使用的ID。这应该是一个介于1到2^32之间的唯一整数。
database.server.name:标识数据库服务器并为其提供名称空间的逻辑名称
polling 支持insert、update ; 支持多种数据库
mode:CDC模式(polling )
datasource.name:SIDDHI配置文件内配置的数据库源
url:数据库连接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
username:数据库用户名,要有'SELECT' 权限
password:数据库用户密码
polling.column:要监听的字段
polling.interval:呆监听的记录数,默认为1
table.name:要监听的表名
wait.on.missed.record:指示进程是否需要等待丢失/无序记录。
当此标志设置为“true”时,一旦识别出丢失的记录,流程将被保留。缺少的recrod由polling.column值的顺序标识。这只能用于数字字段,不建议用于时间值,因为它不是连续的。
仅当记录可能无序写入时(例如并发写入器),才应启用此功能,因为这会降低性能。
missed.record.waiting.timeout:重试丢失/无序记录的超时时间(以秒为单位)。这应该与wait.on.missed.record参数一起使用。如果未设置参数,进程将无限期地等待丢失的记录。
数据库CDC配置
Mysql 5.7.X以后版本支持,默认不支持CDC,需要配置指定参数。8.X.X以后配置默认支持
show VARIABLES like '%log%':查看日志配置参数
image.png
server_id:数据库ID,单机模式下设置 server_id=1,集群模式下不同节点的server_id必须不同
log_bin:指定binlog文件名和储存位置,可设置为log_bin=mysql-bin
binlog_format:binlog格式。有3个值可以选择:ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。STATEMENT:记录修改数据的SQL,日志量较小。MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。binlog_format=row
expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。这里设置为7天,expire_logs_days=7
binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。单例数据库可不设置,集群模式需要配置多条
binlog_do_db=db_a
binlog_do_db=db_b
Oracle,默认是开启的。
SIDDHI驱动程序,试了很多版本,建议使用siddhi-io-cdc-2.0.3.jar ,踩了很多坑。程序默认有2.0.5,2.0.10多多少少都有问题。
mysql数据源配置实例
- name: mysql
description: The datasource used for mysql
jndiConfig:
name: jdbc/mysql
useJndiReference: true
definition:
type: RDBMS
configuration:
jdbcUrl: 'jdbc:mysql://192.168.1.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8'
username: XXXXX
password: XXXXX
driverClassName: com.mysql.cj.jdbc.Driver
maxPoolSize: 20
idleTimeout: 60000
connectionTestQuery: SELECT 1 FROM DUAL
validationTimeout: 30000
isAutoCommit: false
oracle数据源配置实例
- name: test
description: The datasource used for test
jndiConfig:
name: jdbc/test
useJndiReference: true
definition:
type: RDBMS
configuration:
jdbcUrl: 'jdbc:oracle:thin:@192.168.1.10:1521/ORCL'
username: XXXXX
password: XXXXX
driverClassName: oracle.jdbc.driver.OracleDriver
maxPoolSize: 20
idleTimeout: 60000
connectionTestQuery: SELECT 1 FROM DUAL
validationTimeout: 30000
isAutoCommit: false
mysql程序开发实例
@App:name("test_mysql_cdc")
@App:description("Description of the plan")
-- @source(type = 'cdc' , url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
-- username = 'XXXX', password = 'XXXX',
-- table.name = 'person', operation = 'insert',
-- @map(type='keyvalue', @attributes(id = 'ID', name = 'Name', age = 'Age')))
-- define stream inputStream (id int, name string, age int);
@source(type = 'cdc', mode='polling', polling.column = 'ID',
polling.interval = '1',
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
username = 'admin', password = '123456',
table.name = 'person',
@map(type='keyvalue'))
define stream pollingStream (id string, name string, age string);
-- from inputStream#log()
-- select *
-- insert into testString;
from pollingStream#log()
select *
insert into testPolling;
oracle程序开发实例
@source(type = 'cdc',
datasource.name = 'test',
url = '',
mode = 'polling',
polling.column = 'name',
polling.interval = '1',
username = '',
password = '',
table.name = 'user',
@map(type = 'keyvalue',fail.on.missing.attribute="false" ))