Canal(增量数据订阅与消费 )快速配置
模拟mysql的主从备份读取bin-log文件的机制,阿里开源项目,实现对mysq的日志文件的解析;
搭建步骤:
1.开启mysql的binlog功能,并配置binlog模式为row
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
2.在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3. 下载:https://github.com/alibaba/canal/releases
4.解压,修改配置文件
mysql serverId,改成唯一的
vim conf/example/instance.properties
canal.instance.mysql.slaveId = 129
position info,需要改成自己的数据库信息
canal.instance.master.address=192.168.21.134:3306
username/password,需要改成自己的数据库信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=test 指定数据库
canal.instance.connectionCharset=UTF-8
table regex
canal.instance.filter.regex=.*\\..*
5、修改 conf/canal.properties 文件
canal.ip 改成canal所在机器的ip地址,避免无谓的问题 canal.id= 128
canal.ip=192.168.21.134:
canal.port= 11111
启动
bin 下的startup脚本
查看日志 example 下
运行canal-client实例:
添加pom依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
/**
* @ClassName Demo
* @Description TODO
* @Author liang
* @Date 2018\10\9 0009 10:25
* @Version 1.0
**/
public class Demo {
public static void main(String [] args)throws InterruptedException {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.26.134",11111),"example","","");
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
System.out.println(batchId);
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(3000);
}else {
System.out.println(message.getEntries());
connector.ack(batchId);
}
}
}
}