程序员

轻量级mysql binlog同步工具包binlogportal

2020-09-20  本文已影响0人  dothetrick

使用binlog的原因

近期需要重构一个老系统,需要从几个服务中实时同步订单的修改到重构表里。
这里就面临两个选择,

  1. 在每个服务的mysql操作前埋点,发送修改信息到队列或服务上。这种方案需要修改多个服务的代码并且测试对原系统的影响,有额外开发和测试成本。
  2. 同步mysql的binlog,根据表的insert和update更新新表。但是需要维护一个binlog同步的服务

本次选择了binlog同步的方式。搭建的binlog服务也可以用在之后新系统的缓存更新和同步ES索引上,相对于埋点这种只有一次性作用的方式,性价比较高。

工具调研:canal和mysql-binlog-connector-java

1.canal

要实现binlog同步服务,使用较多的开源方案是canal,运行比较稳定,而且功能也很丰富。

但是在实际部署服务的时候,遇到了一些问题:

2.mysql-binlog-connector-java

调研同时也发现了另一个binlog同步工具,mysql-binlog-connector-java

这是一个开源的binlog同步工具,功能很简单,就是接收binlog信息。作为一个依赖jar可以很容易在springboot中使用。

但是没有对binlog的内容做格式化处理,使用很不方便。当然更没有保存信息和分布式部署功能。

自研工具包binlogportal

基于这些问题,我们需要一个具有以下特性的binlog同步工具:

为了满足这些条件,通过对mysql-binlog-connector-java封装后,实现了自研的工具binlogportal。

使用说明

Mysql配置

CREATE USER binlogportal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlogportal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'binlogportal'@'%';
FLUSH PRIVILEGES;

通过spring boot构建项目

<dependency>
  <groupId>com.insistingon.binlogportal</groupId>
  <artifactId>binlogportal-spring-boot-starter</artifactId>
  <version>1.0.5</version>
</dependency>
binlogportal:
  enable: true # 是否启用autoconfig
  distributed-enable: true # 是否启用分布式部署
  distributed-redis: # distributed-enable为true时,要提供一个redis作为分布式协调器
    host: 127.0.0.1
    port: 6379
    auth:
  position-redis: # 保存binlog position的redis,必须配置
    host: 127.0.0.1
    port: 6379
    auth:
  db-config: # 数据库配置,可以有多个,key自定义即可
    d1:
      host: 0.0.0.0
      port: 3306
      user-name: binlogportal
      password: 123456
      handler-list: [logEventHandler] # 该数据库使用的事件处理器,名称为spring的bean name
  http-handler: # 启用自带的http事件处理器,可发送请求
    url-list: [http://127.0.0.1:8988/testit] # 要发送的url列表,http参数为统一的格式
    result-callback: httpCallBack # 配置自定义的结果处理器,需要实现IHttpCallback接口,值为bean name
@Slf4j
@Component
public class BinlogSync implements CommandLineRunner {
    @Resource
    BinlogPortalStarter binlogPortalStarter;

    public void run(String... args) throws Exception {
        try {
            binlogPortalStarter.start();
        } catch (BinlogPortalException e) {
            log.error(e.getMessage(), e);
        }
    }
}

非spring boot项目

<dependency>
  <groupId>com.insistingon.binlogportal</groupId>
  <artifactId>binlogportal</artifactId>
  <version>1.0.5</version>
</dependency>
public class TestClass{
 public static void main(String[] args) {
        SyncConfig syncConfig = new SyncConfig();
        syncConfig.setHost("0.0.0.0");
        syncConfig.setPort(3306);
        syncConfig.setUserName("binlogportal");
        syncConfig.setPassword("123456");

        BinlogPortalConfig binlogPortalConfig = new BinlogPortalConfig();
        binlogPortalConfig.addSyncConfig(syncConfig);

        RedisConfig redisConfig = new RedisConfig("127.0.0.1", 6379);
        RedisPositionHandler redisPositionHandler = new RedisPositionHandler(redisConfig);
        binlogPortalConfig.setPositionHandler(redisPositionHandler);

        binlogPortalConfig.setDistributedHandler(new RedisDistributedHandler(redisConfig));

        BinlogPortalStarter binlogPortalStarter = new BinlogPortalStarter();
        binlogPortalStarter.setBinlogPortalConfig(binlogPortalConfig);
        try {
            binlogPortalStarter.start();
        } catch (BinlogPortalException e) {
            e.printStackTrace();
        }
    }
}

2.分布式部署实现

项目中高可用实现是基于redis的分布式锁。

每个实例都会加载全部数据库的配置,在创建binlog连接之前,先要获取redis锁,获取锁后会定时刷新锁的过期时间。所有实例会定时重新抢锁。

同一个mysql库的binlog文件和position会保存在redis里,如果一个实例宕机。新抢到锁的实例在初始化时,会使用上个实例已保存的binlog信息继续获取。

项目源码可在Git上查看。
项目的Git地址:https://github.com/dothetrick/binlogportal

以上内容属个人学习总结,如有不当之处,欢迎在评论中指正

上一篇下一篇

猜你喜欢

热点阅读