canal 订阅mysql binlog
2018-10-31 本文已影响0人
c458a5378a5a
原理
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,
mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流)
安装mysql(ubuntu)安装mysql的时候会提示设置密码
# 安装
sudo apt-get install mysql-server
sudo apt isntall mysql-client
sudo apt install libmysqlclient-dev
# 设置远程登陆
重命名
sudo vm /etc/mysql/mysql.conf.d/mysqld.cnf my.cnf
编辑
sudo vim /etc/mysql/mysql.conf.d/my.cnf
注释掉bind-address = 127.0.0.1:
开启binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=11
# 登陆
mysql -uroot -p123456
授权
grant all on *.* to root@'%' identified by '123456' with grant option;
设置canal用户密码
GRANT ALL on canal.* to 'canal'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'canal'@'%';
flush privileges;
查看是否开启binlog
show variables like 'log_bin';
show variables like 'binlog_format';
show variables like 'binlog_row_image';
重启mysql
service mysql restart
下载安装canal
wget https://github.com/alibaba/canal/releases/download/v1.0.23/canal.deployer-1.0.23.tar.gz
tar -zxvf canal.deployer-1.0.23.tar.gz
修改canal配置文件
cd conf/example
vim instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info
canal.instance.master.address = 192.168.126.134:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = 123456
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
启动canal,关闭canal
./bin/startup.sh
./bin/stop.sh
查看启动日志,运行日志
cat ../canal/canal.log (start the canal server[172.17.0.1:11111])
cat ../example/example.log
java监控
pom文件
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
javafile
package com.rzj.jdq.back.credit.web.advice;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* Created by work01 on 5/23/18.
*/
public class CanalTest {
public static void main(String[] args) {
// 创建链接172.17.0.1:11111或者192.168.126.134:11111
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("172.17.0.1", 11111), "example", "",
"");// AddressUtils.getHostIp(),
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe("tianyan\\..*");// .*代表database,..*代表table
connector.rollback();//
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
// System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n",
// batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException(
"ERROR ## parser of eromanga-event has an error,data:"
+ entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out
.println(String
.format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry
.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry
.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue()
+ " update=" + column.getUpdated());
}
}
}
update后运行结果
id : 1460 update=false
mobilephones : 15216731250 update=false
emails : wangys@rongzhijia.com update=false
content : [passRate-你我贷-业务异常] 超出阈值 阈值:[0.800] 前七天平均值:[0] 30分钟内实际值:[0.50] 告警时间:[2018-05-02 15:34:04] 详情点击。 update=false
create_time : 2018-06-07 15:34:07 update=false
update_time : 2018-05-23 13:46:17 update=false
-------> after
id : 1460 update=false
mobilephones : 15216731250 update=false
emails : wangys@rongzhijia.com update=false
content : [passRate-你我贷-业务异常] 超出阈值 阈值:[0.800] 前七天平均值:[0] 30分钟内实际值:[0.50] 告警时间:[2018-05-02 15:34:04] 详情点击。 update=false
create_time : 2018-06-07 15:34:07 update=false
update_time : 2018-05-09 13:46:17 update=true