注册中心

2021-08-05  本文已影响0人  理想是一盏灯

类结构图

类结构图

RegistryCenter 为操作注册中心的顶层接口。
CoordinatorRegistryCenter 继承RegistryCenter 接口,并多加了一些操作节点的方法,比如创建临时节点/持久化顺序节点/临时顺序节点的方法,同时加了本地缓存注册中心节点数据的相关方法。
ZookeeperRegistryCenter 实现CoordinatorRegistryCenter 的所有方法,基于curator实现。
ZookeeperConfiguration为zk注册中心配置类,供ZookeeperRegistryCenter 连接zk时设置相关连接参数时使用。

RegistryCenter

RegistryCenter 接口定义了对注册中心进行初始化和增删改查相关操作的方法,其他子类实现都必须实现该接口


RegistryCenter方法

每个方法的功能如下表

方法 功能
RegistryCenter init 初始化注册中心
RegistryCenter close 关闭注册中心
RegistryCenter get 获取注册数据
RegistryCenter isExisted 获取数据是否存在
RegistryCenter persist 持久化注册数据
RegistryCenter update 更新注册数据
RegistryCenter remove 删除注册数据
RegistryCenter getRegistryCenterTime 获取注册中心当前时间
RegistryCenter getRawClient 直接获取操作注册中心的原生客户端

源代码如下:

public interface RegistryCenter {
    
    /**
     * 初始化注册中心.
     */
    void init();
    /**
     * 关闭注册中心.
     */
    void close();
    /**
     * 获取注册数据.
     * 
     * @param key 键
     * @return 值
     */
    String get(String key);
    /**
     * 获取数据是否存在.
     * 
     * @param key 键
     * @return 数据是否存在
     */
    boolean isExisted(String key);
    /**
     * 持久化注册数据.
     * 
     * @param key 键
     * @param value 值
     */
    void persist(String key, String value);
    /**
     * 更新注册数据.
     * 
     * @param key 键
     * @param value 值
     */
    void update(String key, String value);
    
    /**
     * 删除注册数据.
     * 
     * @param key 键
     */
    void remove(String key);
    
    /**
     * 获取注册中心当前时间.
     * 
     * @param key 用于获取时间的键
     * @return 注册中心当前时间
     */
    long getRegistryCenterTime(String key);
    
    /**
     * 直接获取操作注册中心的原生客户端.
     * 如:Zookeeper或Redis等原生客户端.
     * 
     * @return 注册中心的原生客户端
     */
    Object getRawClient();
}

CoordinatorRegistryCenter

CoordinatorRegistryCenter接口继承了RegistryCenter,并加了一些方法,比如对注册中心临时/顺序节点进行操作、本地缓存相关操作、子节点相关操作


CoordinatorRegistryCenter方法

每个方法的功能如下表

方法 功能
CoordinatorRegistryCenter getDirectly 直接从注册中心而非本地缓存获取数据.
CoordinatorRegistryCenter getChildrenKeys 获取子节点名称集合
CoordinatorRegistryCenter getNumChildren 获取子节点数量
CoordinatorRegistryCenter persistEphemeral 持久化临时注册数据
CoordinatorRegistryCenter persistSequential 持久化顺序注册数据
CoordinatorRegistryCenter persistEphemeralSequential 持久化临时顺序注册数据
CoordinatorRegistryCenter addCacheData 添加本地缓存
CoordinatorRegistryCenter evictCacheData 释放本地缓存
CoordinatorRegistryCenter getRawCache 获取注册中心数据缓存对象

代码如下

import java.util.List;

/**
 * 用于协调分布式服务的注册中心.
 * 
 * @author zhangliang
 */
public interface CoordinatorRegistryCenter extends RegistryCenter {
    
    /**
     * 直接从注册中心而非本地缓存获取数据.
     * 
     * @param key 键
     * @return 值
     */
    String getDirectly(String key);
    
    /**
     * 获取子节点名称集合.
     * 
     * @param key 键
     * @return 子节点名称集合
     */
    List<String> getChildrenKeys(String key);
    
    /**
     * 获取子节点数量.
     *
     * @param key 键
     * @return 子节点数量
     */
    int getNumChildren(String key);
    
    /**
     * 持久化临时注册数据.
     * 
     * @param key 键
     * @param value 值
     */
    void persistEphemeral(String key, String value);
    
    /**
     * 持久化顺序注册数据.
     *
     * @param key 键
     * @param value 值
     * @return 包含10位顺序数字的znode名称
     */
    String persistSequential(String key, String value);
    
    /**
     * 持久化临时顺序注册数据.
     * 
     * @param key 键
     */
    void persistEphemeralSequential(String key);
    
    /**
     * 添加本地缓存.
     * 
     * @param cachePath 需加入缓存的路径
     */
    void addCacheData(String cachePath);
    
    /**
     * 释放本地缓存.
     *
     * @param cachePath 需释放缓存的路径
     */
    void evictCacheData(String cachePath);
    
    /**
     * 获取注册中心数据缓存对象.
     * 
     * @param cachePath 缓存的节点路径
     * @return 注册中心数据缓存对象
     */
    Object getRawCache(String cachePath);
}

ZookeeperRegistryCenter

ZookeeperRegistryCenter 基于Curator实现了CoordinatorRegistryCenter接口的所有方法,Curator是一个基于zk原生api封装的高水平的客户端jar包,不了解的同学这个框架的可自行搜索相关资料

初始化依赖的配置类-ZookeeperConfiguration

public final class ZookeeperConfiguration {
    
    /**
     * 连接Zookeeper服务器的列表.
     * 包括IP地址和端口号.
     * 多个地址用逗号分隔.
     * 如: host1:2181,host2:2181
     */
    private final String serverLists;
    
    /**
     * 命名空间.
     */
    private final String namespace;
    
    /**
     * 等待重试的间隔时间的初始值.
     * 单位毫秒.
     */
    private int baseSleepTimeMilliseconds = 1000;
    
    /**
     * 等待重试的间隔时间的最大值.
     * 单位毫秒.
     */
    private int maxSleepTimeMilliseconds = 3000;
    
    /**
     * 最大重试次数.
     */
    private int maxRetries = 3;
    
    /**
     * 会话超时时间.
     * 单位毫秒.
     */
    private int sessionTimeoutMilliseconds;
    
    /**
     * 连接超时时间.
     * 单位毫秒.
     */
    private int connectionTimeoutMilliseconds;
    
    /**
     * 连接Zookeeper的权限令牌.
     * 缺省为不需要权限验证.
     */
    private String digest;
}

ZookeeperRegistryCenter 成员属性和构造方法

  1. 属性
  1. 构造方法:设置属性zkConfig的值

代码如下

/**
 * 基于Zookeeper的注册中心.
 * 
 * @author zhangliang
 */
@Slf4j
public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter {
    
    @Getter(AccessLevel.PROTECTED)
    private ZookeeperConfiguration zkConfig;
    
    private final Map<String, TreeCache> caches = new HashMap<>();
    
    @Getter
    private CuratorFramework client;
    
    public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
        this.zkConfig = zkConfig;
    }

初始化

 @Override
 public void init() {
     log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
     // 通过工厂+builder创建Curator client实例
     CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
             // 服务器列表,格式host1:port1,host2:port2,...
             .connectString(zkConfig.getServerLists())
             // 重试策略
             .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
             //命名空间
             .namespace(zkConfig.getNamespace());
     // 会话超时时间
     if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
         builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
     }
     //连接创建超时时间
     if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
         builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
     }
     //ACL相关
     if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
         builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
                 .aclProvider(new ACLProvider() {
                 
                     @Override
                     public List<ACL> getDefaultAcl() {
                         return ZooDefs.Ids.CREATOR_ALL_ACL;
                     }
                 
                     @Override
                     public List<ACL> getAclForPath(final String path) {
                         return ZooDefs.Ids.CREATOR_ALL_ACL;
                     }
                 });
     }
     // 构建得到Curator client实例
     client = builder.build();
     //启动Curator client实例
     client.start();
     try {
         if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
             client.close();
             throw new KeeperException.OperationTimeoutException();
         }
         //CHECKSTYLE:OFF
     } catch (final Exception ex) {
         //CHECKSTYLE:ON
         RegExceptionHandler.handleException(ex);
     }
 }
sleepMs = this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1))

注册中心异常处理类RegExceptionHandler

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class RegExceptionHandler {
    
    /**
     * 处理异常.
     * 
     * <p>处理掉中断和连接失效异常并继续抛注册中心.</p>
     * 
     * @param cause 待处理异常.
     */
    public static void handleException(final Exception cause) {
        if (null == cause) {
            return;
        }
        if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
            log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
        } else if (cause instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        } else {
            throw new RegException(cause);
        }
    }
    
    private static boolean isIgnoredException(final Throwable cause) {
        return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
    }
}

缓存

通过 Curator 的TreeCache 实现缓存指定目录的数据,内部有zk的watcher监听该目录的变更事件,该目录及该目录下任何节点的变更都会实时更新到缓存,不熟悉Curator 缓存机制的同学可以自行搜索了解

添加目录缓存数据

    @Override
    public void addCacheData(final String cachePath) {
        // 缓存指定路径下的数据
        TreeCache cache = new TreeCache(client, cachePath);
        try {
            cache.start();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
       // 将当前作业的目录缓存数据加到注册中心caches
        caches.put(cachePath + "/", cache);
    }

    public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }

订阅缓存目录变更事件

可以订阅TreeCache的缓存目录,具体就是通过增加监听器来监听缓存目录的状态变更事件,当收到该缓存目录下任何节点的变更事件后将会回调监听器的childEvent方法。
不熟悉Curator 缓存机制的同学可以自行搜索了解
后面文章讲的注册中心监听器,都会订阅缓存目录的事件实现其功能逻辑。
com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage#addDataListener

   /**
     * 注册数据监听器.
     * 
     * @param listener 数据监听器

     */
    public void addDataListener(final TreeCacheListener listener) {
        TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
        cache.getListenable().addListener(listener);
    }
    

订阅发生的时机在作业启动初始时的如下类中,作业启动初始化后面讲解


订阅缓存目录变更事件类

释放指定目录下缓存数据

    //释放指定路径下的缓存数据
    @Override
    public void evictCacheData(final String cachePath) {
        TreeCache cache = caches.remove(cachePath + "/");
        if (null != cache) {
            cache.close();
        }
    }   

获取指定路径下的缓存数据

  @Override
    //获取指定路径的缓存数据
    public Object getRawCache(final String cachePath) {
        return caches.get(cachePath + "/");
    }

获取数据

优先从缓存获取数据

    @Override
    public String get(final String key) {
        //先优先从本地缓存获取数据
        TreeCache cache = findTreeCache(key);
        if (null == cache) {
            return getDirectly(key);
        }
        //根据key从缓存获取数据
        ChildData resultInCache = cache.getCurrentData(key);
        if (null != resultInCache) {
            return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
        }
        // 本地缓存获取不到,直接从注册中心获取
        return getDirectly(key);
    }

    // 从缓存获取数据
    private TreeCache findTreeCache(final String key) {
        for (Entry<String, TreeCache> entry : caches.entrySet()) {
            if (key.startsWith(entry.getKey())) {
                return entry.getValue();
            }
        }
        return null;
    }

从注册中心获取数据

    //从注册中心获取数据
    @Override
    public String getDirectly(final String key) {
        try {
            return new String(client.getData().forPath(key), Charsets.UTF_8);
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
            return null;
        }
    }

子节点相关

获取子节点方法

    @Override
    public List<String> getChildrenKeys(final String key) {
        try {
            //获取节点的子节点
            List<String> result = client.getChildren().forPath(key);
            //节点倒序
            Collections.sort(result, new Comparator<String>() {
                
                @Override
                public int compare(final String o1, final String o2) {
                    return o2.compareTo(o1);
                }
            });
            return result;
         //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
            return Collections.emptyList();
        }
    }

获取子节点数量方法

    @Override
    public int getNumChildren(final String key) {
        try {
            // 获取节点的子节点数量
            Stat stat = client.checkExists().forPath(key);
            if (null != stat) {
                return stat.getNumChildren();
            }
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
        return 0;
    }

节点操作相关

判断节点是否存在

    @Override
    public boolean isExisted(final String key) {
        try {
            //判断节点是否存在
            return null != client.checkExists().forPath(key);
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
            return false;
        }
    }

创建持久化节点

    @Override
    public void persist(final String key, final String value) {
        try {
            //不存在则创建持久化节点
            if (!isExisted(key)) {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
            } else {
                //存在则更新
                update(key, value);
            }
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }
   

更新节点方法

    @Override
    public void update(final String key, final String value) {
        try {
            // 更新数据
            client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }
    

创建临时节点方法

    @Override
    public void persistEphemeral(final String key, final String value) {
        try {
            //存在则删除节点
            if (isExisted(key)) {
                client.delete().deletingChildrenIfNeeded().forPath(key);
            }
            //创建临时节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }
   

创建持久化顺序节点

该方法在当前版本未使用,可以不用关注

    @Override
    public String persistSequential(final String key, final String value) {
        try {
            //创建持久化的顺序节点
            return client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(key, value.getBytes(Charsets.UTF_8));
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
        return null;
    }
    

创建临时顺序节点方法

    @Override
    public void persistEphemeralSequential(final String key) {
        try {
            //创建临时顺序节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key);
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }

移除节点方法

    @Override
    public void remove(final String key) {
        try {
            //移除节点
            client.delete().deletingChildrenIfNeeded().forPath(key);
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }    

获取注册中心当前时间

    @Override
    public long getRegistryCenterTime(final String key) {
        long result = 0L;
        try {
            persist(key, "");
            //获取指定节点的注册中心时间
            result = client.checkExists().forPath(key).getMtime();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
        Preconditions.checkState(0L != result, "Cannot get registry center time.");
        return result;
    }

获取注册中心原生客户端

    @Override
    public Object getRawClient() {
        //获取curator的client
        return client;
    }

关闭注册中心连接

    @Override
    public void close() {
        // 先关闭缓存
        for (Entry<String, TreeCache> each : caches.entrySet()) {
            each.getValue().close();
        }
        waitForCacheClose();
        //再关闭连接
        CloseableUtils.closeQuietly(client);
    }
    
    /* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常
     * 因为异步处理, 可能会导致client先关闭而cache还未关闭结束.
     * 等待Curator新版本解决这个bug.
     * BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
     */
    private void waitForCacheClose() {
        try {
            Thread.sleep(500L);
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }
上一篇下一篇

猜你喜欢

热点阅读