canal 订阅mysql binlog

2018-10-31  本文已影响0人  c458a5378a5a

原理

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,
mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流)

安装mysql(ubuntu)安装mysql的时候会提示设置密码

# 安装
sudo apt-get install mysql-server
sudo apt isntall mysql-client
sudo apt install libmysqlclient-dev
# 设置远程登陆
重命名
sudo vm /etc/mysql/mysql.conf.d/mysqld.cnf my.cnf
编辑
sudo vim /etc/mysql/mysql.conf.d/my.cnf
注释掉bind-address = 127.0.0.1:
开启binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=11
# 登陆
mysql -uroot -p123456
授权
grant all on *.* to root@'%' identified by '123456' with grant option;
设置canal用户密码
GRANT ALL on canal.* to 'canal'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'canal'@'%';
flush privileges;
查看是否开启binlog
show variables like 'log_bin';
show variables like 'binlog_format';
show variables like 'binlog_row_image';

重启mysql
service mysql restart

下载安装canal

wget https://github.com/alibaba/canal/releases/download/v1.0.23/canal.deployer-1.0.23.tar.gz
tar -zxvf canal.deployer-1.0.23.tar.gz

修改canal配置文件

cd conf/example
vim instance.properties 

## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info
canal.instance.master.address = 192.168.126.134:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = 123456
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =  

启动canal,关闭canal

./bin/startup.sh 
./bin/stop.sh 

查看启动日志,运行日志

cat ../canal/canal.log (start the canal server[172.17.0.1:11111])

cat ../example/example.log

java监控

pom文件

 <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.0.12</version>
        </dependency>

javafile

package com.rzj.jdq.back.credit.web.advice;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * Created by work01 on 5/23/18.
 */
public class CanalTest {
    public static void main(String[] args) {
        // 创建链接172.17.0.1:11111或者192.168.126.134:11111
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("172.17.0.1", 11111), "example", "",
                "");// AddressUtils.getHostIp(),
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe("tianyan\\..*");// .*代表database,..*代表table
            connector.rollback();//
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    // System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n",
                    // batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException(
                        "ERROR ## parser of eromanga-event has an error,data:"
                                + entry.toString(), e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out
                    .println(String
                            .format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s",
                                    entry.getHeader().getLogfileName(), entry
                                            .getHeader().getLogfileOffset(),
                                    entry.getHeader().getSchemaName(), entry
                                            .getHeader().getTableName(),
                                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue()
                    + "    update=" + column.getUpdated());
        }
    }

}

update后运行结果

id : 1460    update=false
mobilephones : 15216731250    update=false
emails : wangys@rongzhijia.com    update=false
content : [passRate-你我贷-业务异常] 超出阈值 阈值:[0.800] 前七天平均值:[0] 30分钟内实际值:[0.50] 告警时间:[2018-05-02 15:34:04] 详情点击。    update=false
create_time : 2018-06-07 15:34:07    update=false
update_time : 2018-05-23 13:46:17    update=false
-------> after
id : 1460    update=false
mobilephones : 15216731250    update=false
emails : wangys@rongzhijia.com    update=false
content : [passRate-你我贷-业务异常] 超出阈值 阈值:[0.800] 前七天平均值:[0] 30分钟内实际值:[0.50] 告警时间:[2018-05-02 15:34:04] 详情点击。    update=false
create_time : 2018-06-07 15:34:07    update=false
update_time : 2018-05-09 13:46:17    update=true

canal HA 搭建

上一篇下一篇

猜你喜欢

热点阅读