阿里开源Canal--④投递数据到Kafka
基本说明
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
- kafka: (https://github.com/apache/kafka)
- RocketMQ :(https://github.com/apache/rocketmq)
1.配置修改
基于前面第二章节内容我们搭建的canal server(Billow自编译版本为1.1.3,支持动态Topic配置),我们在instance的配置文件中做配置的修改。
1.1.修改instance 配置文件
vi conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考前面Billow发的文章:
dynamicTopic规则: 表达式如果只有库名则匹配库名的数据都会发送到对应名称topic, 如果是库名.表名则匹配的数据会发送到以'库名_表名'为名称的topic。如要指定topic名称,则可以配置:
canal.mq.dynamicTopic=examp2:.*;exmaple3:mytest\\..*,mytest2\\..*;example4:mytest3.user
以topic名 ':' 正则规则作为配置, 多个topic配置之间以 ';'隔开, message会发送到所有符合规则的topic
1.2.修改canal 配置文件
vi /usr/local/canal/conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
mq相关参数说明
canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号分隔
- 例子1:test\.test 指定匹配的单表,发送到以 test_test为名字的topic上
- 例子2:.\..* 匹配所有表,每个表都会发送到各自表名的topic上
- 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
- 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
- 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
支持指定topic名称匹配, 配置格式:topicName:schema 或 schema.table,多个配置之间使用逗号分隔, 多组之间使用 ; 分隔
- 例子:test:test,test1\.test1;test2:test2,test3\.test1 针对匹配的表会发送到指定的topic上
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力
表达式说明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔
- 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
- 例子2:.\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
- 例子3:.\..*: 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
- 例子4: 匹配规则啥都不写,则默认发到0这个partition上
- 例子5:.\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
- 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
- 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
mq顺序性问题
binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答
- canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
- canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
- canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
- canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
- canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
- 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
- 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
- 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意
1.4.启动
cd /usr/local/canal/
sh bin/startup.sh
1.5.查看日志
a.查看 logs/canal/canal.log
vi logs/canal/canal.log
b. 查看instance的日志:
vi logs/example/example.log
1.6 关闭
cd /usr/local/canal/
sh bin/stop.sh
1.7.MQ数据消费
canal源码中有实例代码;如下
public class CanalKafkaClientExample {
protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class);
private KafkaCanalConnector connector;
private static volatile boolean running = false;
private Thread thread = null;
private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("parse events has an error", e);
}
};
public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId){
connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
}
public static void main(String[] args) {
try {
final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
AbstractKafkaTest.zkServers,
AbstractKafkaTest.servers,
AbstractKafkaTest.topic,
AbstractKafkaTest.partition,
AbstractKafkaTest.groupId);
logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
kafkaCanalClientExample.start();
logger.info("## the canal kafka consumer is running now ......");
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the kafka consumer");
kafkaCanalClientExample.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka consumer:", e);
} finally {
logger.info("## kafka consumer is down.");
}
}
});
while (running)
;
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the kafka consumer:", e);
System.exit(0);
}
}
public void start() {
Assert.notNull(connector, "connector is null");
thread = new Thread(new Runnable() {
public void run() {
process();
}
});
thread.setUncaughtExceptionHandler(handler);
thread.start();
running = true;
}
public void stop() {
if (!running) {
return;
}
running = false;
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
}
private void process() {
while (!running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
while (running) {
try {
connector.connect();
connector.subscribe();
while (running) {
try {
List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
if (messages == null) {
continue;
}
for (Message message : messages) {
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
// printSummary(message, batchId, size);
// printEntry(message.getEntries());
logger.info(message.toString());
}
}
connector.ack(); // 提交确认
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
try {
connector.unsubscribe();
} catch (WakeupException e) {
// No-op. Continue process
}
connector.disconnect();
}
}