canal学习

2022-03-07  本文已影响0人  笨手笨脚越

[toc]

canal是什么

canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。

中文文档 https://www.wenjiangs.com/doc/canal-introduction

官网 https://github.com/alibaba/canal

工作原理

image.png

1.canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2.mysql master收到dump请求,开始推送binary log给slave(也就是canal)
3.canal解析binary log对象(原始为byte流)

架构

image.png

说明:

instance模块:

安装

1.下载安装包

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

tar -xvf canal.deployer-1.1.5.tar.gz 

2.mysql开启binlog

修改my.cnf

[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL
log-bin=mysql-bin 
binlog-format=ROW 
server_id=1 

binlog是row模式

重启后,执行sql指令show variables like '%log_bin%'

image.png

3.创建mysql的canal用户

mysql> CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'localhost' WITH GRANT OPTION;
Query OK, 0 rows affected (0.01 sec)
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)

4.修改canal配置文件

文件夹/root/canal/conf/有一个example文件夹,一个example就代表一个instance实例

vi /root/canal/conf/example/instance.properties
#################################################
# 定义mysql slave的id
canal.instance.mysql.slaveId=1234
# 填写数据库ip:端口
canal.instance.master.address=192.168.10.27:3306
# 填写数据库username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#################################################

5.启动、关闭、重启canal

cd /root/canal/bin
sh startup.sh 
sh stop.sh
sh restart.sh  

6.相关日志

/root/canal/logs/canal/canal.log
/root/canal/logs/example/example.log

java代码读取binlog同步到redis

1.添加依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

2.RedisUtil

package com.wangyue.study.canal;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {

    // Redis服务器IP
    private static String ADDR = "192.168.10.27";

    // Redis的端口号
    private static int PORT = 6379;

    // 访问密码
    private static String AUTH = "hxcx123!@#";

    // 可用连接实例的最大数目,默认值为8;
    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
    private static int MAX_ACTIVE = 1024;

    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
    private static int MAX_IDLE = 200;

    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
    private static int MAX_WAIT = 10000;

    // 过期时间
    protected static int  expireTime = 660 * 660 *24;

    // 连接池
    protected static JedisPool pool;

    /**
     * 静态代码,只在初次调用一次
     */
    static {
        JedisPoolConfig config = new JedisPoolConfig();
        //最大连接数
        config.setMaxTotal(MAX_ACTIVE);
        //最多空闲实例
        config.setMaxIdle(MAX_IDLE);
        //超时时间
        config.setMaxWaitMillis(MAX_WAIT);
        //
        config.setTestOnBorrow(false);
        pool = new JedisPool(config, ADDR, PORT, 1000, AUTH, 3);
    }

    /**
     * 获取jedis实例
     */
    protected static synchronized Jedis getJedis() {
        Jedis jedis = null;
        try {
            jedis = pool.getResource();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return jedis;
    }

    /**
     * 释放jedis资源
     *
     * @param jedis
     * @param isBroken
     */
    protected static void closeResource(Jedis jedis, boolean isBroken) {
        return;
    }

    /**
     *  是否存在key
     *
     * @param key
     */
    public static boolean existKey(String key) {
        Jedis jedis = null;
        boolean isBroken = false;
        try {
            jedis = getJedis();
            jedis.select(0);
            return jedis.exists(key);
        } catch (Exception e) {
            isBroken = true;
        } finally {
            closeResource(jedis, isBroken);
        }
        return false;
    }

    /**
     *  删除key
     *
     * @param key
     */
    public static void delKey(String key) {
        Jedis jedis = null;
        boolean isBroken = false;
        try {
            jedis = getJedis();
            jedis.select(0);
            jedis.del(key);
        } catch (Exception e) {
            isBroken = true;
        } finally {
            closeResource(jedis, isBroken);
        }
    }

    /**
     *  取得key的值
     *
     * @param key
     */
    public static String stringGet(String key) {
        Jedis jedis = null;
        boolean isBroken = false;
        String lastVal = null;
        try {
            jedis = getJedis();
            jedis.select(0);
            lastVal = jedis.get(key);
            jedis.expire(key, expireTime);
        } catch (Exception e) {
            isBroken = true;
        } finally {
            closeResource(jedis, isBroken);
        }
        return lastVal;
    }

    /**
     *  添加string数据
     *
     * @param key
     * @param value
     */
    public static String stringSet(String key, String value) {
        Jedis jedis = null;
        boolean isBroken = false;
        String lastVal = null;
        try {
            jedis = getJedis();
            jedis.select(0);
            lastVal = jedis.set(key, value);
            jedis.expire(key, expireTime);
        } catch (Exception e) {
            e.printStackTrace();
            isBroken = true;
        } finally {
            closeResource(jedis, isBroken);
        }
        return lastVal;
    }

    /**
     *  添加hash数据
     *
     * @param key
     * @param field
     * @param value
     */
    public static void hashSet(String key, String field, String value) {
        boolean isBroken = false;
        Jedis jedis = null;
        try {
            jedis = getJedis();
            if (jedis != null) {
                jedis.select(0);
                jedis.hset(key, field, value);
                jedis.expire(key, expireTime);
            }
        } catch (Exception e) {
            isBroken = true;
        } finally {
            closeResource(jedis, isBroken);
        }
    }

}

3.CanalTest

package com.wangyue.study.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.util.List;


public class CanalTest {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.27", 11111),
                "example", "canal", "canal");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

    private static void redisInsert( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
        }
    }

    private static  void redisUpdate( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
        }
    }

    private static  void redisDelete( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            RedisUtil.delKey("user:"+ columns.get(0).getValue());
        }
    }

}


运行后,在mysql数据库里修改数据保存

控制台结果:


image.png

canal集群搭建

安装zookeeper

!!这里注意下,不要使用zookeeper的高版本,可能会出现启动失败的情况,Starting zookeeper ... FAILED TO START

1.下载解压

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz

tar -xvf zookeeper-3.4.9.tar.gz 

2.修改配置

cd zookeeper-3.4.9/conf

cp zoo_sample.cfg zoo.cfg

vi zoo.cfg
############################
#设置数据存储位置
dataDir=/root/zookeeper/data
############################
  1. 启动/重启/关闭
./zkServer.sh start

./zkServer.sh restart

./zkServer.sh stop

  1. 查看状态
 ./zkServer.sh status
 
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone

  1. 客户端连接
# 2181 是zk默认端口
./zkCli.sh -server localhost:2181

集群部署

目前canal的集群部署仅支持HA形式,使用zookeeper来实现抢占式HA,一个active,多个standby。

修改canal配置文件

vi canal/conf/canal.properties

# register ip to zookeeper
canal.register.ip = 192.168.10.27
# zk地址,如果多个zk用逗号隔开且不留空格,例如10.105.10.123:2181,10.105.10.124:2181,10.105.10.125:2181
canal.zkServers = 192.168.10.27:2181

canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署从节点canal

拷贝主节点的canal到另一台机器,修改instance配置

vi /root/canal/conf/example/instance.properties

# 设置slaveid,和master不同即可
canal.instance.mysql.slaveId=1235

修改canal配置

vi /root/canal/conf/canal.properties
# register ip to zookeeper
canal.register.ip = 192.168.10.26

其他配置项都跟主节点一致,然后两个节点canal启动

查看canal在zk中的状态

./zkCli.sh -server localhost:2181

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.10.26:11111"}
cZxid = 0x86
ctime = Fri Mar 04 01:14:55 EST 2022
mZxid = 0x86
mtime = Fri Mar 04 01:14:55 EST 2022
pZxid = 0x86
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17f53863740000a
dataLength = 47
numChildren = 0

部署情况:

[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.10.27:11111"}
cZxid = 0x9f
ctime = Fri Mar 04 03:06:18 EST 2022
mZxid = 0x9f
mtime = Fri Mar 04 03:06:18 EST 2022
pZxid = 0x9f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17f53863740000b
dataLength = 47
numChildren = 0

java客户端代码

修改CanalConnector即可,连接地址改成zookeeper的ip:端口

CanalConnector connector = CanalConnectors.newClusterConnector("192.168.10.27:2181","example", "canal", "canal");

Canal Admin

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

部署

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

mkdir /tmp/canal-admin

tar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin

修改配置文件 conf/application.yml 设置mysql地址

image.png

在mysql执行conf/canal_manager.sql 初始化sql

启动

sh bin/startup.sh

启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

image.png

canal+Kafka进行数据库同步

为了高可用和更高的性能,我们会创建多个canal-client构成一个集群,来进行解析并同步到新的数据库。这里就出现了一个比较重要的问题,如何保证canal-client集群解析消费binlog的顺序性呢?

我们使用的binlog是row模式。每一个写操作都会产生一条binlog日志。 举个简单的例子:插入了一条a记录,并且立马修改a记录。这样会有两个消息发送给canal-client,如果由于网络等原因,更新的消息早于插入的消息被处理了,还没有插入记录,更新操作的最后效果是失败的。

canal可以和消息队列组合,支持kafka,rabbitmq,rocketmq多种选择,在消息队列这层来实现消息的顺序性。

image.png

安装kafka

部署

wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz

修改配置文件

vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 修改参数

zookeeper.connect=192.168.10.27:2181
listeners=PLAINTEXT://:9092
# zookeeper地址
advertised.listeners=PLAINTEXT://192.168.10.27:9092 

启动server

start脚本

# bin/kafka-server-start.sh  -daemon  config/server.properties &
查看所有topic

# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181

查看指定topic 下面的数据
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.117:9092  --from-beginning --topic example_t
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

修改canal配置

修改配置文件canal.properties

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# kafka地址
kafka.bootstrap.servers = 192.168.10.27:9092

然后重启

[root@bogon kafka_2.11-1.1.1]# sh bin/kafka-topics.sh --list --zookeeper 192.168.10.27:2181
example

java代码

pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
package com.wangyue.study.canal;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CanalKafkaConsumer {


    public static void main(String[] args) {
        /* 消费者三个属性必须指定(broker地址清单、key和value的反序列化器) */
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.10.27:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        //  群组并非完全必须. 重要知识:在同一Topic下,相同的groupID消费群组中,只有一个消费者可以拿到数据。
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group1");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            //消费者订阅主题(可以多个)
            consumer.subscribe(Collections.singletonList("example"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s", record.topic(), record.partition(),
                            record.offset(), record.key(), record.value()));

                    JSONObject valueJson = JSONObject.parseObject(record.value());
                    JSONArray data = valueJson.getJSONArray("data");
                    String type = valueJson.getString("type");
                    String table = valueJson.getString("table");

                    if (StringUtils.equalsIgnoreCase(type, "delete")) {
                        redisDelete(data, table);
                    } else if (StringUtils.equalsIgnoreCase(type, "insert")) {
                        redisInsert(data, table);
                    } else if (StringUtils.equalsIgnoreCase(type, "update")) {
                        redisUpdate(data, table);
                    }

                }
            }

            //通过另外一个线程 consumer. wakeup()
        } finally {
            consumer.close();
        }


    }


    private static void redisInsert(JSONArray data, String tableName) {
        for (int i = 0; i < data.size(); i++) {
            JSONObject rowData = data.getJSONObject(i);
            String key = tableName + ":" + rowData.getString("id");
            RedisUtil.stringSet(key, rowData.toJSONString());
        }
    }

    private static void redisUpdate(JSONArray data, String tableName) {
        for (int i = 0; i < data.size(); i++) {
            JSONObject rowData = data.getJSONObject(i);
            String key = tableName + ":" + rowData.getString("id");
            RedisUtil.stringSet(key, rowData.toJSONString());
        }
    }

    private static void redisDelete(JSONArray data, String tableName) {
        for (int i = 0; i < data.size(); i++) {
            JSONObject rowData = data.getJSONObject(i);
            String key = tableName + ":" + rowData.getString("id");
            RedisUtil.delKey(key);
        }
    }
}

canal存到kafka里的数据内容范例:

{
    "data": [{
        "id": "17",
        "doctorId": "15",
        "name": "来二楼3",
        "birthday": "2013-02-28",
        "sex": "0",
        "telephone": "15632554566",
        "province": "重庆市",
        "city": "重庆城区",
        "area": "万州区",
        "address": "AP库珀热热蓉蓉",
        "createTime": "2022-02-28 17:29:43",
        "updateTime": "2022-03-03 14:48:31",
        "createBy": null,
        "updateBy": "18960862122",
        "isRemove": "1"
    }],
    "database": "tcm",
    "es": 1646631848000,
    "id": 9,
    "isDdl": false,
    "mysqlType": {
        "id": "int",
        "doctorId": "int",
        "name": "varchar(200)",
        "birthday": "varchar(100)",
        "sex": "int",
        "telephone": "varchar(20)",
        "province": "varchar(50)",
        "city": "varchar(50)",
        "area": "varchar(50)",
        "address": "varchar(255)",
        "createTime": "varchar(50)",
        "updateTime": "varchar(50)",
        "createBy": "varchar(50)",
        "updateBy": "varchar(50)",
        "isRemove": "bit(1)"
    },
    "old": [{
        "name": "来二楼"
    }],
    "pkNames": ["id"],
    "sql": "",
    "sqlType": {
        "id": 4,
        "doctorId": 4,
        "name": 12,
        "birthday": 12,
        "sex": 4,
        "telephone": 12,
        "province": 12,
        "city": 12,
        "area": 12,
        "address": 12,
        "createTime": 12,
        "updateTime": 12,
        "createBy": 12,
        "updateBy": 12,
        "isRemove": -7
    },
    "table": "t_patient",
    "ts": 1646631848288,
    "type": "UPDATE"
}

redis 存取结果:

image.png
上一篇下一篇

猜你喜欢

热点阅读