互联网、电商、产品、运营、文案策划、生活

canal源码解析之canal.meta

2016-12-17  本文已影响0人  holly_wang_王小飞

canal的模块canal.meta 增量订阅&消费信息管理器 说法很官方口吻,我们来看一下 meta模块是怎么做到的。
首先看一下代码组成部分

canal.meta源码部分
包含测试类在内都是非常少的。
上一篇canal源码解析之canal.deployer看到instance实例启动的时候第一个启动的就是 CanalMetaManager
protected CanalMetaManager                       metaManager;
 @Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }
…………………………
    }

所以先看CanalMetaManager这个类

CanalMetaManager类图

这是个接口,实现了接口CanalLifeCycle接口,CanalLifeCycle是生命管理的接口定义,非常简单的三个接口定义

public interface CanalLifeCycle {

    void start();

    void stop();

    boolean isStart();
}

CanalMetaManager接口定义了几个接口方法,比较重要的入口方法是

 /**
     * 增加一个 client订阅 <br/>
     * 如果 client已经存在,则不做任何修改
     */
    void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;

所有客户端连接到canal的时候都会调用这个方法(这个在另一个模块,后续再说)看下实现类

CanalMetaManager接口的实现类

还是比较简单。
主要说一下 canal server为每个连接到canal server client实例分配一个 CanalMetaMange,用于记录 客户端上次同步到事件的位置,连接的哪个instance等。主要记录方式无外乎内存,文件,zookeeper,和混合模式,所以就有了这几个实现类
MemoryMetaManager是记录到内存
FileMixedMetaManager是内存和文件的混合模式,策略是: 1. 先写内存,然后定时刷新数据到File 2. 数据采取overwrite模式(只保留最后一次),通过logger实施append模式(记录历史版本)
ZooKeeperMetaManager 基于zk树形节点的记录方式


  zk 版本的 canal manager, 存储结构:

  /otter
     canal
       destinations
         dest1 
           client1
             filter
             batch_mark
               1
               2
               3

MixedMetaManager 是组合memory + zookeeper的使用模式
PeriodMixedMetaManager 是基于定时刷新的策略的mixed实现,做了几个优化2个优化:

  1. 去除batch数据刷新到zk中,切换时batch数据可忽略,重新从头开始获取
  2. cursor的更新,启用定时刷新,合并多次请求。如果最近没有变化则不更新

然后我们来看下每种方式是怎么做到的。具体debug 对应的test类即可
两个抽象的测试类
AbstractZkTest 和AbstractMetaManagerTest,AbstractMetaManagerTest继承AbstractZkTest
AbstractZkTest 定义了destion和zk的cluster
AbstractMetaManagerTest 定义了mysql的地址和抽象了几个测试类都会用到
doSubscribeTest、doBatchTest和doCursorTest

先看内存版实现的测试类
MemoryMetaManagerTest
我们可以看出内存版的实现 其实就是存贮到了对应的map中

 protected Map<String, List<ClientIdentity>>              destinations;
 protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
 protected Map<ClientIdentity, Position>                  cursors;

MemoryMetaManager的start方法 初始化了 对应的map

public void start() {
       super.start();

       batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {

           public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
               return MemoryClientIdentityBatch.create(clientIdentity);
           }

       });

       cursors = new MapMaker().makeMap();

       destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {

           public List<ClientIdentity> apply(String destination) {
               return Lists.newArrayList();
           }
       });
   }

然后模拟了 客户端client的subscribe

public void doSubscribeTest(CanalMetaManager metaManager) {
        ClientIdentity client1 = new ClientIdentity(destination, (short) 1);
        metaManager.subscribe(client1);
        metaManager.subscribe(client1); // 重复调用
        ClientIdentity client2 = new ClientIdentity(destination, (short) 2);
        metaManager.subscribe(client2);

        List<ClientIdentity> clients = metaManager.listAllSubscribeInfo(destination);
        Assert.assertEquals(Arrays.asList(client1, client2), clients);

        metaManager.unsubscribe(client2);
        ClientIdentity client3 = new ClientIdentity(destination, (short) 3);
        metaManager.subscribe(client3);

        clients = metaManager.listAllSubscribeInfo(destination);
        Assert.assertEquals(Arrays.asList(client1, client3), clients);

    }

具体是测试多次subscribe不会出现重复的client记录到内存,还有就是 unscrbe的时候是否从内存移除掉,比较简单。
testBatchAll是为了测试为 client 产生一个唯一、递增的id是否会重复之类的问题。
testCursorAll是为了更新 cursor 游标等
其他的类比较类似,主要是存贮的方式不一样,再主要看个文件的和zk的
文件版的subscribe

public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
        super.subscribe(clientIdentity);

        // 订阅信息频率发生比较低,不需要做定时merge处理
        executor.submit(new Runnable() {

            public void run() {
                flushDataToFile(clientIdentity.getDestination());
            }
        });
    }

看到调用了flushDataToFile方法

private void flushDataToFile(String destination, File dataFile) {
        FileMetaInstanceData data = new FileMetaInstanceData();
        if (destinations.containsKey(destination)) {
            synchronized (destination.intern()) { // 基于destination控制一下并发更新
                data.setDestination(destination);

                List<FileMetaClientIdentityData> clientDatas = Lists.newArrayList();
                List<ClientIdentity> clientIdentitys = destinations.get(destination);
                for (ClientIdentity clientIdentity : clientIdentitys) {
                    FileMetaClientIdentityData clientData = new FileMetaClientIdentityData();
                    clientData.setClientIdentity(clientIdentity);
                    Position position = cursors.get(clientIdentity);
                    if (position != null && position != nullCursor) {
                        clientData.setCursor((LogPosition) position);
                    }

                    clientDatas.add(clientData);
                }

                data.setClientDatas(clientDatas);
            }

            String json = JsonUtils.marshalToString(data);
            try {
                FileUtils.writeStringToFile(dataFile, json);
            } catch (IOException e) {
                throw new CanalMetaManagerException(e);
            }
        }
    }

看到最后是写到了文件 FileUtils.writeStringToFile(dataFile, json);
看一个比较完整的meta.dat的文件格式

{
    "clientDatas": [
        {
            "clientIdentity": {
                "clientId": 1001,
                "destination": "example",
                "filter": ".*\\..*"
            },
            "cursor": {
                "identity": {
                    "slaveId": -1,
                    "sourceAddress": {
                        "address": "192.168.1.4",
                        "port": 3306
                    }
                },
                "postion": {
                    "included": false,
                    "journalName": "mysql-bin.000002",
                    "position": 10670,
                    "serverId": 1,
                    "timestamp": 1481773274000
                }
            }
        }
    ],
    "destination": "example"
}

可以看到其实就是序列化的描述整个canal instance对应数据对象。

在看下zk版本的实现

public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
            clientIdentity.getClientId());

        try {
            zkClientx.createPersistent(path, true);
        } catch (ZkNodeExistsException e) {
            // ignore
        }
        if (clientIdentity.hasFilter()) {
            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
                clientIdentity.getClientId());

            byte[] bytes = null;
            try {
                bytes = clientIdentity.getFilter().getBytes(ENCODE);
            } catch (UnsupportedEncodingException e) {
                throw new CanalMetaManagerException(e);
            }

            try {
                zkClientx.createPersistent(filterPath, bytes);
            } catch (ZkNodeExistsException e) {
                // ignore
                zkClientx.writeData(filterPath, bytes);
            }
        }
    }

也是非常简单调用了zk客户端代码进行了树形目录的创建和数据写入

zk树形目录创建

注意这里创建的树的类型。

写入数据到zk

这是调用zkclient代码写入数据。我这里安装了zkui 所以可以看一下数据

zkui web形式查看写入的节点数据

这里会有人经常遇到这样的问题就是
canal配置canal.instance.filter.regex无效,这里说一下这个问题
其实可以查看对应的存贮文件meta.dat或者zk上的节点数据

有{"clientIdentity":{"clientId":1001,"destination":"example","filter":".\.."}

所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".\.."),不然等于没设置canal.instance.filter.regex。
如果一定要调用CanalConnector.subscribe(".\.."),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。

总结,其实这个涉及到了设计模式,策略,根据你的配置,是否是基于zk等选择存贮方式,实现对client的管理。

上一篇下一篇

猜你喜欢

热点阅读