Java

一次数据同步的模型分享

2023-01-06  本文已影响0人  互联网高级架构师

一、背景

如今的软件架构中,数据同步无论是对于微服务代码的解耦、高并发、还是数据分析,都是非常常见且必须的需求场景。数据同步分为异构数据同步和同类型数据存储的同步。异构数据同步可以让数据源多样性,比如从mysql数据同步到es或者redis,数据到redis可以防止极端场景的缓存击穿也可以解耦代码层面的依赖、提高代码性能以更好的支持高并发,又比如es对于大数据的查询较友好,一般也会将mysql的数据下沉到es做查询来减轻mysql的压力,可以同步Hbase做数据分析产出表报等等。

同类型的数据存储数据同步方案,比如mysql数据同步到mysql,可以用来做数据迁移、应用数据源在线切换,如果在数据同步的基础上加入数据冲突检测和VIP机制就可以作为mysql的异地多活架构,也就是常说的shard nothing分布式架构。mysql的数据同步方案,一般都是通过binlog是实现数据回放,在目前存在的数据同步方案中, 被大家熟知的应该是阿里云开源的canal, 但是canal虽然比较优秀,但在中小规模场景中过重,资源消耗巨大且部署配置又比较复杂。因此,我们基于现实需求,结合实际的应用场景产出了本文所述的解决方案。

二、解决方案

我们先是找到了go-mysql-transfer这个开源项目,它可以支持mysql到异构存储的方案,但是这个方案在mysql到mysql的数据同步方面, 无法满足需求, 因此我们基于go-mysql-transfer进行了二次开发,最终代码可见:github.com/j262965682/goMysqlSync, 实现mysql到mysql数据同步问题,同时运用多线程来提高同步速度,保证数据行级有序的最终一致性。

三、逻辑架构

整体的结构如下图所示:

在源端去验证和解析配置文件,通过配置文件规则来界定数据同步范围和一系列参数配置,启动一个go-mysql的canal服务去拉取源端的binlog,用定制handler去接收binlog包装成对象传给一个DDL chan线程 或者 多个DML chan线程,这里有个细节是一旦 DDL chan 里有对象就会去阻塞DML的所有chan,因为DDL可能会带来表结构变更,如果不是有序,则会带来问题。在目标端执行的时候,还会有一个sql合并的过程来增加执行性能。

四、代码分析

我们从源码解析的角度来分析一下具体的实现。

1、程序入口

下面开始对main做分析,省略掉次要信息,加了一些注释

首先做参数解析,起一个prometheus的web接口,接着根据参数做服务的初始化,上传监控项值,然后启动同步服务,等待进程信号结束

func main() {
  // 解析参数
  if helpFlag {
    flag.Usage()
    return
  }
    ......
    //监控web prometheus 端口
    //-----------------------------------
  go http.ListenAndServe(":9999", nil)
  //-----------------------------------
    ......
    // 根据参数做初始化服务,
  err := service.InitApplication(cfgPath)
  if err != nil {
    println(
      errors.WithStack(err))
    return
  }
    ......
  //开始上传 promethus 监控数据
  global.StartMonitor()
    // 进程信号
  signalChan := make(chan os.Signal, 1)
  signal.Notify(signalChan,
    os.Kill,
    os.Interrupt,
    syscall.SIGINT,
    syscall.SIGTERM,
    syscall.SIGQUIT)

  if global.Cfg().NotCluster() {
    // 开始同步
    service.StartApplication()
  } else {
    service.BootCluster()
  }
    // 等待进程结束信号,退出进程
  select {
  case sig := <-signalChan:
    log.Printf("Application Stop,Signal: %s \n", sig.String())
  case <-service.CtxDone():
    log.Printf("context is done with %v, closing", service.CtxErr())
  }
    // 关闭服务
  service.CloseApplication()
}

2、服务初始化

其中 InitApplication 函数中做了对配置文件和日志的解析,存储的初始化storage.InitStorage,还有全量同步服务的初始化transferService.initialize。存储使用bolt库来存储已执行过的binlog序列化后的对象和对应的position,之所以记录是因为如果程序异常退出可以重启重新应用这些对象来重新建立无损的数据同步

func InitApplication(cfgPath string) error {
  //解析配置文件
  cfg, err := global.NewConfigWithFile(cfgPath)
  if err != nil {
    return err
  }
  //配置日志
  err = logutil.InitGlobalLogger(cfg.LoggerConfig)
  if err != nil {
    return err
  }
  ......
  err = storage.InitStorage(cfg)
  if err != nil {
    return err
  }
    ......
  transferService := &TransferService{
    config: cfg,
  }
  err = transferService.initialize()
  if err != nil {
    return err
  }
  return nil
}

接着往下看 transferService.initialize,这个方法中对canal做初始化,接着通过配置文件中的库表规则来解析哪些库表需要同步,然后在目标端做表结构同步在_endpoint.Start()中,新建一个多线程的hashMap对象来对canal的 event handler接收binlog,hashMap内部是 类型为binlog请求类型的指针chan的list和一个互斥锁

hashMap = &HashMap{
    Array:   make([]chan *RowRequest, len),
    ChanLen: len,
    Lock:    sync.Mutex{},
  }
func (s *TransferService) initialize() error {
  if err := s.initCanal(); err != nil {
    return errors.WithStack(err)
  }
  if err := s.initRules(); err != nil {
    return errors.WithStack(err)
  }
  // 初始化 endpoint   Start 里面会根据标识是否同步表结构
  _endpoint := endpoint.NewEndpoint(s.config, s.canal)
  if err := _endpoint.Start(); err != nil {
    return errors.WithStack(err)
  }
  global.SetDestinationState(global.MetricsStateOK)
  s.endpoint = _endpoint

    // start()中已经检测和同步了表结构 接下来 全量同步数据,全量数据按表的行数来分批,每批的量由 参数 控制
  stockService := NewStockService(s)
  if err := stockService.Run(); err != nil {
    return errors.Trace(err)
  }

    // 初始化canal的mysqldump,和使用库表规则来限定canal的接收范围
  s.initDumper()
    ......
  s.ctx, s.cancelFunc = context.WithCancel(context.Background())

  //初始化hashMap 来对应分配线程使用
  hashMap := global.NewHashMap(s.config.Threads, 4096)

  s.handler = &handler{
    requestQueue:    make(chan interface{}, 4096),
    ddLRequestQueue: make(chan interface{}, 4096),
    transfer:        s,
    hashMap:         hashMap,
  }
  s.canal.SetEventHandler(s.handler)

  return nil
}

3、服务启动

初始化完成后,看TransferService的run方法

func (s *TransferService) run() error {
  s.wg.Add(1)
    // 起一个线程去监听canal的binlog请求
  s.handler.startRequestQueueListener()
    ......
    // 按 position 启动canal,拉取binlog传递给RequestQueueListener 
  if err = s.canal.RunFrom(current); err != nil {
    log.Println(fmt.Sprintf("start transfer : %v", err))
    logutil.Errorf("start transfer : %v", err)
    s.cancelFunc()
    return errors.Trace(err)
  }
    ......
  s.running.Store(false)
  logutil.Info("Canal is Closed")
  return nil
}

主要的逻辑在s.handler.startRequestQueueListener()内部

func (h *handler) startRequestQueueListener() {
  go func() {
        ......
    //DDL事件需求队列
    var DDLMessage = make(chan *global.RowRequest, bulkSize)
    //初始化控制器
    global.GlobalChangeChan = global.ChangeChan{
      DdlControl:     false,
      DdlControlChan: make(chan struct{}),
    }
    //创建 消费binlog事件 线程
    for i := 0; i < h.transfer.config.Threads; i++ {
      go h.transfer.endpoint.Consume(i, h.hashMap.Array[i], global.GlobalChangeChan)
    }
    //创建 DDL专门处理线程
    go h.transfer.endpoint.Consume(100, DDLMessage, global.GlobalChangeChan)

    for {
      //DDLSavePos := false
      //监听线程循环一次
      //判断是否是DDL
      if global.GlobalChangeChan.DdlControl {
        //进入处理就关闭标识
        //fmt.Println("通过控制字符 进入循环 准备从等待队列中取请求")
        global.GlobalChangeChan.Mutex.Lock()
        global.GlobalChangeChan.DdlControl = false
        global.GlobalChangeChan.Mutex.Unlock()
        //监听线程 into ddl
        select {
        case ddlRequest := <-h.ddLRequestQueue:
          switch ddlRequest := ddlRequest.(type) {
          case *global.RowRequest:
            if isMysql {
              //从等待队列中获取请求 放入执行队列
              DDLMessage <- ddlRequest
            }
          }
        //时间间隔 不然上面会堵死
        case <-ticker.C:
          //等待DDL执行完成
        }
      } else {
                //监听线程 into dml
        //needFlush := false
        needSavePos := false
        select {
        //非ddl线程
        case v := <-h.requestQueue:
          switch v := v.(type) {
          case global.PosRequest:
            now := time.Now()
            //三分钟一次打印 position
            if v.Force || now.Sub(lastSavedTime) > 2*time.Minute {
              lastSavedTime = now
              //needFlush = true
              needSavePos = true
              current = global.PosRequest{
                Name:      v.Name,
                Pos:       v.Pos,
                Timestamp: v.Timestamp,
                Force:     false,
              }
            }
          case *global.RowRequest:
            if isMysql {
              //搜集binlog 每次搜集满 BulkSize 就主动刷binlog 不等刷新间隔
              h.hashMap.Array[v.Hash] <- v
            }
          }
        case <-h.transfer.ctx.Done():
          return
        //时间间隔 保证一秒一循环  不然无法进入ddl判断
        case <-ticker.C:
          //没有需要执行的dml
          //没有需要执行的sql,上报当前时间戳
          global.ExecuteSQLTimestamp = uint32(time.Now().Unix())
        }
        //三分钟记录一次 position
        if needSavePos {
          //判断 时间戳 是否正常,正常则保存到存储且输出到日志,异常则不保存丢弃
          if current.Timestamp > 0 {
            if err := h.transfer.positionStorage.RecordPosition(current); err != nil { //报错则打错误日志 退出  不报错则打正常日志
              logutil.Errorf("row event save sync position %s err %v, close sync", current, err)
              h.transfer.cancelFunc()
              return
            } else {
              logutil.Info("Timestamp:" + strconv.Itoa(int(current.Timestamp)) + ",Datetime:" + util.TimestampToDatetime(int64(current.Timestamp)) + ",PosName:" + current.Name + ",Pos:" + strconv.Itoa(int(current.Pos)))
            }
          }
        }
      }
    }
  }()
}

内容讲解:

新建DDL处理线程和多个DML处理线程,每个线程都一个chan,在一个死循环中接收binlog请求,通过global.GlobalChangeChan.DdlControl 来判断是否有接收到 DDL请求,因为是由多个线程来设置global.GlobalChangeChan.DdlControl ,所以由互斥锁的保护来设置值。 后续将DDL和DML请求都放进各自的chan,其中DML根据不同v.Hash来选择放入不同的chan,v.Hash取值是主键的hash,由此来保证行级别有序。

接下来由 go h.transfer.endpoint.Consume(i, h.hashMap.Array[i], global.GlobalChangeChan)来消费请求。 了解完 整体的代码框架后,再来了解一番 数据同步的思路逻辑

五、总体数据同步逻辑

整体数据同步大体分为三步: 开始之前 通过对配置文件的解析,拿到需要同步库和表

1、库表结构同步

操作步骤
show create table table_name;

当在目的库回放表结构的时候,会对建库sql做 if noe exists 改造,已存在库则不会新建,库完成之后再执行收集到的建表sql,执行前后会 临时关闭外键 处理外键问题

SET FOREIGN_KEY_CHECKS = 0;
SET FOREIGN_KEY_CHECKS = 1;

2、全量数据同步

操作步骤
数据怎么拉取?有没有其他方式?

原先这里的全量同步 是采用 mysqldump 来处理,mysqldump的优点是方便不用源库导出和目的库导入的处理逻辑,缺点有几个点 1.数据导出时,不受控制,一把梭,会引起源库的高CPU,当源库的数据量很大时,由于mysqldump运行时间超长,会引起线程超时 ;2.数据导入时,不能运用多线程导入,导入的速度也会受影响,也不可控。 所以在后续的改造中,把全量同步做成了数据拉取达到可控和允许多线程的目的,可配置线程数和批量大小

dump_threads: 40              #全量同步线程数
dump_record_rows: 1000        #全量同步每批次大小

拉取数据的时候,首先查整表数据量多大,根据一下sql做条数循环。

select * from (select id from table_name order by id limit 0,1000) a left join table_name b on a.id=b.id

当取到值时,根据当前表的列,转化 insert into 的sql,当拉取多条时,把多条insert sql做合并处理,提高插入速度。 以上是一个线程的操作流程,项目中使用 golang中较高效的连接池库 github.com/panjf2000/ants/v2 来做多线程导出导入处理,线程的数量由 dump_threads 参数控制

3、增量同步

操作步骤
互斥冲突算法

我也了解过阿里云DTS在这块的处理逻辑(我没记错的话,有一个好听的名字叫做 蜂窝冲突算法) ,大概原理是多端导入DDL和DML后,经过一系列算法得出下一条应该执行的sql,无论是 DDL 还是 DML。

为什么需要有序性和互斥性?

因为数据同步模拟的源库操作在目的表回放。 例如:当主库删除一个A字段的时马上插入一行数据,那么我们回放的操作也必须是先执行删除A字段的DDL再执行插入一行数据的DML。 如果不保证有序,先执行插入的DML,当A字段是非空属性字段是就会报错; 如果不保证互斥性,两条sql一直执行,也可能会发生同样的报错。

DML的并发问题,为什么要并发,怎么并发,不并发行不行?

首先要说的是,不并发行不行,我理解是行的,mysql也是5.7后面一些版本才支持多线程的从库同步(并行复制),只是说不并发显得程序同步数据会比较慢,另外golang以多线程著称就显得有点呆,再好的机器也发挥不出性能,应快尽快。 带着并发的思路再看增量同步,问题就来了,所谓并发无非是用多线程去取sql,拿到sql就去执行,比如某行数据先插入再修改,而当并发执行时修改的sql 线程后拿到sql,然而因为执行速度较快 先执行了,那就发生报错了,因为插入的sql没有执行,这条要修改的数据还不存在,这里又涉及到了有序性,前面讲的是DDL 和 DML之间的有序性,现在遇到的是 DML之前的有序性。 为什么并发会带来有序性校验的问题,原因很直接,因为单线程执行本身就解决了有序执行的问题。 既然 MYSQL 已经解决了多线程执行的问题,那就借鉴它。


mysql从5.7开始支持基于逻辑时钟的并行复制,对于logical_clock多线程复制,允许并行回放的粒度为事务级别的,理论只要事务之间不冲突都可以并行回放 二进制日志中新增了 last_committed 和 sequence_number。

last_committed 表示事务在每个二进制日志文件中的 binlog group 编号,sequence_number 为每个二进制日志文件中的事务编号。last_committed 会有重复值,值相同表示事务在同一个 binlog group 中表示这些事务并行提交时没有冲突,随意在回放时具有相同的 last_committed 事务是可以并行回放的。

last_committed 值是主库事务在进入 prepare 阶段时获取已经提交事务的最大的 sequence_number

一言以蔽之:一个组提交的事务都是可以并行回放。 事实上,这个解决方案也是一定缺陷的,这个方案用 白话 说就是找同一组的事务,那么问题也跟着来了,找多久算找一次呢,实际上由两个参数来控制 binlog_group_commit_sync_no_delay_count(找几个) 、 binlog_group_commit_sync_delay(找几秒) ,当主库事务频繁的时候,这个方案可以起到较好的并发作用,因为很容易满足找多久的问题,当主库事务本就不多的情况下,这两个参数就变成累赘了。

官方也意识到这个问题,出了一个优化解决方案:WriteSet。

writeset = hash(index_name,db_name,db_name_length,table_name,table_name_length,value,value_length)

大概的意思:通过计算每行记录的哈希值来确定是否是相同记录判断是否冲突,只要不冲突都可以放在一个提交组里面一起提交,这样就把 本该在下一组才能提交的、没有逻辑关系的数据 在一起提交了,极大的加速了并发效果,在这种情况下 那两个控制参数就显得不那么重要。


理解了 MYSQL 的处理方案之后,那么直接“学习”。 先开多线程(由参数 threads 控制),对多线程进行编号,比如 0-20,接收到DML后,直接对 binlog 做 hash,hash成 0-20 放到对应的线程里,实现 WriteSet 的逻辑,也就是 行级有序。 另外在每个线程里收集到的sql,还可以对同类型并且可以合并的sql做合并处理,进一步增加回放速度。

DML 幂等性改造

这也是第一步中为什么要求检查存在主键id的原因。 幂等性改造是为了让 DML 不管在什么情况下执行都能达到相同而准确的目的,提高容错能力。 如何改造: INSERT 原 sql

INSERT INTO table_name(ID,A,B,C) VALUES (1,"A","B","C");

INSERT 改造后 sql,当数据已存在时忽略插入。

INSERT ignore INTO table_name(ID,A,B,C) VALUES (1,"A","B","C");

DELETE 原 sql

DELETE FROM table_name WHERE ID=1 AND A="A" AND B="B" AND C="C";

DELETE 改造后 sql,通过 id 来识别行。

DELETE FROM table_name WHERE ID=1;

UPDATE 原 sql

UPDATE TABLE table_name SET ID=1 , A="A+" , B="B" , C="C" WEHRE ID=1 AND A="A" AND B="B" AND C="C";

UPDATE 改造后 sql,通过 id 来识别行。

UPDATE TABLE table_name SET ID=1 , A="A+" , B="B" , C="C" WEHRE ID=1;

举个例子

  1. 当目的表的数据不一致时,原sql(更新操作)执行会报错,改造后sql执行后会把数据纠正过来,增加容错性。
  2. 当 DML (更新操作) 重复执行时(超时重试),第二次执行时也会报错,改造后sql可以正常执行。

一整套逻辑下来 模拟MYSQL的多线程复制,最终实现多并发、行级有序、具有容错能力的数据同步。

五、总结

本文介绍了一个开源数据同步的实现方案,提供了一些对于项目的了解思路。很多 大家耳熟能详的数据同步工具的底层思路也都类似,他们的架构可能会比较大,把 收集端 、处理端、执行端 分开 以获得更大的性能提升,本文 抛砖引玉 ,希望大家有更好的数据同步处理和优化的实现方案。

作者:政采云技术团队
链接:https://juejin.cn/post/7184951547543945253
来源:稀土掘金

上一篇 下一篇

猜你喜欢

热点阅读