Canal(增量数据订阅与消费 )快速配置

2018-10-09  本文已影响0人  匿名的我呀

模拟mysql的主从备份读取bin-log文件的机制,阿里开源项目,实现对mysq的日志文件的解析;

搭建步骤:

1.开启mysql的binlog功能,并配置binlog模式为row

[mysqld]

log-bin=mysql-bin #添加这一行就ok

binlog-format=ROW #选择row模式

server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

2.在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)

CREATE USER canal IDENTIFIED BY 'canal'; 

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

3. 下载:https://github.com/alibaba/canal/releases

4.解压,修改配置文件

mysql serverId,改成唯一的

vim conf/example/instance.properties

canal.instance.mysql.slaveId = 129

position info,需要改成自己的数据库信息

canal.instance.master.address=192.168.21.134:3306

username/password,需要改成自己的数据库信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

canal.instance.defaultDatabaseName=test  指定数据库

canal.instance.connectionCharset=UTF-8

table regex

canal.instance.filter.regex=.*\\..*

5、修改 conf/canal.properties 文件

canal.ip 改成canal所在机器的ip地址,避免无谓的问题 canal.id= 128

canal.ip=192.168.21.134:

canal.port= 11111


启动

bin 下的startup脚本

查看日志 example 下

运行canal-client实例:

添加pom依赖:

<dependency>

    <groupId>com.alibaba.otter</groupId>

    <artifactId>canal.client</artifactId>

    <version>1.0.12</version>

</dependency>


import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

/**

* @ClassName Demo

* @Description TODO

* @Author liang

* @Date 2018\10\9 0009 10:25

* @Version 1.0

**/

public class Demo {

public static void main(String [] args)throws InterruptedException {

CanalConnector connector = CanalConnectors.newSingleConnector(

new InetSocketAddress("192.168.26.134",11111),"example","","");

connector.connect();

connector.subscribe(".*\\..*");

while (true) {

Message message = connector.getWithoutAck(100);

long batchId = message.getId();

System.out.println(batchId);

if (batchId == -1 || message.getEntries().isEmpty()) {

Thread.sleep(3000);

}else {

System.out.println(message.getEntries());

connector.ack(batchId);

}

}

}

}

上一篇下一篇

猜你喜欢

热点阅读