Canal——原理架构及应用场景

2021-08-14  本文已影响0人  小波同学

Canal简介

Canal是阿里开源的一款基于Mysql数据库binlog的增量订阅和消费组件,通过它可以订阅数据库的binlog日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。

github地址:https://github.com/alibaba/canal

完整wiki地址:https://github.com/alibaba/canal/wiki

Canal工作原理

原理相对比较简单:


Mysql主从同步原理

canal工作原理其实也是基于mysql主从同步原理的,所以理解mysql主从同步原理是第一步

同步原理:

启用Binlog注意以下几点:

Mysql的binlog

什么是中继日志?

Canal架构

说明:

instance模块:

Canal-HA机制

canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。

canal的ha分为两部分,canal server和canal client分别有对应的ha实现:

server ha的架构图如下:


大致步骤:

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制

Canal应用场景

1、同步缓存redis/全文搜索ES

canal一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法,如下图所示:


2、下发任务

另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/kafka进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过MQ发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。

3、数据异构

在大型网站架构中,DB都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。

所谓的数据异构,那就是将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。canal就是实现数据异构的手段之一。

canal的HA集群模式部署

1、canal下载地址

tar -zxf canal.deployer-1.1.5.tar.gz

解压出来的目录结构是这样的:


2、mysql开启binlog

MySQL,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

3、mysql授权账号权限

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限(repication权限),如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

备注

可以通过在 mysql 终端中执行以下命令判断配置是否生效:

show variables like 'log_bin';
show variables like 'binlog_format';

4、修改配置

canal服务的部署其实特别简单,解压之后只需修改canal.properties、instance.properties这两个配置两个文件即可。

修改canal.properties中配置

//指定注册的zk集群地址
canal.zkServers = 192.168.135.27:2181,192.168.135.28:2181,192.168.135.29:2181

//HA模式必须使用该xml,需要将相关数据写入zookeeper,保证数据集群共享
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

// 这个demo就是conf目录里的实例,如果要建别的实例'test'就建个test目录,
// 把example里面的instance.properties文件拷贝到test的实例目录下就好了,
// 然后在这里的配置就是canal.destinations = demo,test
canal.destinations = demo

修改配置文件instance.properties

## mysql serverId , v1.0.26+ will autoGen
# canal伪装的mysql slave的编号,不能与mysql数据库和其他的slave重复
canal.instance.mysql.slaveId=1003
#  按需修改成自己的数据库信息
# position info
canal.instance.master.address=10.200.*.109:3306

# username/password 数据库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName = test

5、详细步骤

canal 的HA集群模式部署详细步骤,可以访问:https://blog.csdn.net/XDSXHDYY/article/details/97825508

然后cd到bin目录 启动和停止canal-server
启动

sh  startup.sh

停止

sh  stop.sh

验证启动状态,查看log文件

vim canal/log/canal/canal.log  

2021-07-11 16:17:38.224 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2021-07-11 16:17:38.273 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.19.0.1:11111]
2021-07-11 16:17:38.569 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

上述日志信息显示启动canal成功

SpringBoot整合Canal

两种方式,官方提供的demo和springboot starter

1、官方提供的

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

具体参考:https://blog.csdn.net/leilei1366615/article/details/108819651

2、springboot starter方式

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
canal:
  server: 127.0.0.1:11111
  destination: demo
@Component
public class ADHandler implements EntryHandler<SeckillGoods> {

    @Override
    public void insert(SeckillGoods seckillGoods) {
        // CanalModel可以得到当前这次的库名和表名
        CanalModel canal = CanalContext.getModel();

        //业务操作
        //新增缓存操作等
    }

    @Override
    public void update(SeckillGoods before, SeckillGoods after) {
        //业务操作
        //更新缓存操作等
    }

    @Override
    public void delete(SeckillGoods seckillGoods) {
        //业务操作
        //删除缓存操作等
    }
}

参考:
https://www.cnblogs.com/caoweixiong/p/11824423.html

https://www.cnblogs.com/huangxincheng/p/7456397.html

https://segmentfault.com/a/1190000023297973

https://blog.csdn.net/XDSXHDYY/article/details/97825508

https://blog.csdn.net/qq_46893497/article/details/111026996

上一篇 下一篇

猜你喜欢

热点阅读