Java面试

zookeeper源码分析之curator客户端

2018-09-16  本文已影响67人  1d96ba4c1912

curator是对zookeeper原生客户端的一个封装,让我们使用起来更加方便。本文针对它的工作原理做一个总结,由于可能需要对zookeeper原生客户端的使用方式有一些了解才能更好的理解本文,因此建议先看下zookeeper源码分析之客户端源码解密

一、执行过程概述

先来看一段常见的使用curator连接ZK并创建节点的代码:

//初始化客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("localhost:2181")
        .sessionTimeoutMs(60 * 1000)
        .connectionTimeoutMs(3 * 1000)
        .retryPolicy(new ExponentialBackoffRetry(1000, 29, 60 * 1000))
        .build();
//开始连接ZK集群
client.start();
//阻塞等待连接ZK集群
boolean success = client.blockUntilConnected(5, SECONDS);
if (success){
    //创建ZK节点
    client.create().creatingParentsIfNeeded().forPath("test", "test".getBytes("UTF-8"));
}

从这个Demo看出,使用curator需要先创建实例,然后使用start方法开启ZK连接,最后才是执行增删改查操作,下面顺着这个思路一起来看下源码吧。

二、源码分析

先来看下初始化CuratorFramework的源码:

//最后的build就是返回一个CuratorFrameworkImpl对象
public CuratorFramework build()
{
    return new CuratorFrameworkImpl(this);
}

//看下CuratorFrameworkImpl的构造函数
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
    //zk初始化工厂,默认的ZK工厂为DefaultZookeeperFactory,就是调用zk原生的客户端new Zookeeper(...)
    ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
    //注意这里client的类型
    this.client = new CuratorZookeeperClient
        (
            localZookeeperFactory,
            builder.getEnsembleProvider(),
            builder.getSessionTimeoutMs(),
            builder.getConnectionTimeoutMs(),
            builder.getWaitForShutdownTimeoutMs(),
            //这个watcher收到ZK原生的事件以后封装成curator的事件类型CuratorEvent然后进行处理
            new Watcher()
            {
                @Override
                public void process(WatchedEvent watchedEvent)
                {
                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                    processEvent(event);
                }
            },
            builder.getRetryPolicy(),
            builder.canBeReadOnly(),
            builder.getConnectionHandlingPolicy()
        );
    //zk连接状态的管理类,状态发生变化时回调listener的逻辑就在里面
    connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
    //ZK节点默认值为本机IP,ZK本身是不允许创建没有value的节点的,但curator允许,就是使用了该默认值
    byte[] builderDefaultData = builder.getDefaultData();
    //为了便于理解,省略了大量其他变量的初始化
}
//上面调用的CuratorZookeeperClient的构造函数
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
        int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
        RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
    //封装了ZK客户端,传入的watcher也继续传下去
    state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
    //继续省略大量初始化其他变量代码
}
//ConnectionState的构造函数
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
    //ensembleProvider是用来获取ZK地址的类,以zk地址字符串作为参数会封装成FixedEnsembleProvider类型
    this.ensembleProvider = ensembleProvider;
    this.sessionTimeoutMs = sessionTimeoutMs;
    this.connectionTimeoutMs = connectionTimeoutMs;
    this.tracer = tracer;
    this.connectionHandlingPolicy = connectionHandlingPolicy;
    //这里的parentWatcher就是由CuratorFrameworkImpl的构造函数传进来的
    if ( parentWatcher != null )
    {
        parentWatchers.offer(parentWatcher);
    }

    //从这个this可以看出来该类继承了ZK中的watcher,也就是说该类本质上就是一个watcher
    zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
//最后一个构造函数什么也不做,只是赋值
HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly)
{
    this.zookeeperFactory = zookeeperFactory;
    this.watcher = watcher;
    this.ensembleProvider = ensembleProvider;
    this.sessionTimeout = sessionTimeout;
    this.canBeReadOnly = canBeReadOnly;
}

从上面一系列的初始化过程来看:

  1. CuratorFrameworkImpl对外暴露的是操作ZK的API,例如增删改查,持有的客户端引用是CuratorZookeeperClient类型。
  2. CuratorZookeeperClient提供的API较少,它持有客户端引用是ConnectionState类型。
  3. ConnectionState本身实现了zookeeper中的watcher接口,它负责处理zookeeper事件,例如会话超时,它持有的客户端引用是HandleHolder类型。
  4. HandleHolder里面封装了连接以及关闭ZK的逻辑。

完成变量初始化以后想真正连接ZK就要调用CuratorFramework类中的start方法,看下源码,同样还是省略了部分代码:

public void start()
{
    //这个state是CuratorFrameworkImpl中表示自己状态的,别弄混
    if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
    {
        throw new IllegalStateException("Cannot be started more than once");
    }

    try
    {
        //看类名可以知道这是启动连接状态的管理线程
        connectionStateManager.start(); 
        //这里调用的是CuratorZookeeperClient中的start方法,真正与ZK建立连接
        client.start();
        //开启一个异步线程去执行一些背后的任务
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
        executorService.submit(new Callable<Object>()
        {
            @Override
            public Object call() throws Exception
            {
                backgroundOperationsLoop();
                return null;
            }
        });
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        handleBackgroundOperationException(null, e);
    }
}

通过之前zookeeper原生API的源码可以知道,建立ZK连接的时候需要注册一个watcher来处理ZK返回的连接状态跟节点状态的变化的事件。上面的源码虽然先启动了connectionStateManager类,但我们这里还是先看下CuratorZookeeperClient的start方法:

public void start() throws Exception
{
    //这里又一个类似的started变量,它是CuratorZookeeperClient类自己的,别弄混
    if ( !started.compareAndSet(false, true) )
    {
        throw new IllegalStateException("Already started");
    }
    //这里就是调用的ConnectionState的start方法
    state.start();
}
//ConnectionState的start方法
void start() throws Exception
{
    //这个是在获取ZK地址前做一些操作,默认为空
    ensembleProvider.start();
    reset();
}

synchronized void reset() throws Exception
{
    log.debug("reset");

    //用来记录zookeeper实例被构建了多少次,通过这个计数可以区分状态没有变化时连接有没有发生改变,例如连接秒断开就秒连上。
    instanceIndex.incrementAndGet();
    //连接状态置为false
    isConnected.set(false);
    //连接开始时间
    connectionStartMs = System.currentTimeMillis();
    //关闭老的zookeeper实例,重新构建新的helper
    zooKeeper.closeAndReset();
    //连接ZK
    zooKeeper.getZooKeeper();   // initiate connection
}
//上面调用的HandleHolder类中的方法
void closeAndReset() throws Exception
{
    //如果有的话关闭之前的zookeeper实例,重构构建helper
    internalClose(0);
    //这里初始化了helper对象
    helper = new Helper()
    {
        private volatile ZooKeeper zooKeeperHandle = null;
        private volatile String connectionString = null;

        @Override
        public ZooKeeper getZooKeeper() throws Exception
        {
            synchronized(this)
            {
                if ( zooKeeperHandle == null )
                {
                    connectionString = ensembleProvider.getConnectionString();
                    //这里传入的watcher就是ConnectionState
                    zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                }

                //连接ZK以后修改helper的实现,后面读取不用再加锁
                //这个思路有点怪,但是合理
                helper = new Helper()
                {
                    @Override
                    public ZooKeeper getZooKeeper() throws Exception
                    {
                        return zooKeeperHandle;
                    }

                    @Override
                    public String getConnectionString()
                    {
                        return connectionString;
                    }

                    @Override
                    public int getNegotiatedSessionTimeoutMs()
                    {
                        return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
                    }
                };

                return zooKeeperHandle;
            }
        }

        @Override
        public String getConnectionString()
        {
            return connectionString;
        }

        @Override
        public int getNegotiatedSessionTimeoutMs()
        {
            return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
        }
    };
}
//通过上面的helper实现知道这里真正与ZK建立连接
ZooKeeper getZooKeeper() throws Exception
{
    return (helper != null) ? helper.getZooKeeper() : null;
}

上面的代码成功与ZK集群建立了连接,顺着思路,接下来应该通过watcher接收ZK返回的连接事件并进行处理,而这里的watcher就是ConnectionState类,看下它处理事件的源码,直接看它实现的Watcher接口的process方法即可:

public void process(WatchedEvent event)
{
    //这里为None说明收到的事件是ZK连接状态改变的事件
    if ( event.getType() == Watcher.Event.EventType.None )
    {
        boolean wasConnected = isConnected.get();
        boolean newIsConnected = checkState(event.getState(), wasConnected);
        //连接状态发生变化
        if ( newIsConnected != wasConnected )
        {
            isConnected.set(newIsConnected);
            //记录连接开始时间
            connectionStartMs = System.currentTimeMillis();
            //连接状态变化为已连接则记录新协商的会话超时时间
            if ( newIsConnected )
            {
                lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
            }
        }
    }

    //回调curator自己的watcher,注意这个parentWatchers在初始化的时候就传进来一个
    for ( Watcher parentWatcher : parentWatchers )
    {
        OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
        parentWatcher.process(event);
        trace.commit();
    }
}

下面先来看下关键的对于连接状态的处理,就是上面的checkState方法源码:

private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
                default:
                case Disconnected:
                {
                    isConnected = false;
                    break;
                }
                case SyncConnected:
                case ConnectedReadOnly:
                {
                    isConnected = true;
                    break;
                }
                case AuthFailed:
                {
                    isConnected = false;
                    break;
                }
                case Expired:
                {
                    isConnected = false;
                    checkNewConnectionString = false;
                    //处理会话过期,里面其实就是重新构建ZK连接
                    handleExpiredSession();
                    break;
                }
                case SaslAuthenticated:
                {
                    break;
                }
        }
        //当连接状态发生改变且不是会话过期时,检查ZK地址是否发生变化
        if ( checkNewConnectionString )
        {
            String newConnectionString = zooKeeper.getNewConnectionString();
            if ( newConnectionString != null )
            {
                //处理ZK地址发生变化
                handleNewConnectionString(newConnectionString);
            }
        }

        return isConnected;
    }

处理会话过期的逻辑很简单,就是销毁当前ZK实例,重建一个新的ZK实例,如下:

private void handleExpiredSession()
{
    try
    {
        //session超时会重建ZK连接,该方法上面已经贴过,此处省略
        reset();
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        //重连ZK失败,加入异常队列处理
        queueBackgroundException(e);
    }
}

再来看下ZK地址发生变化的情况,即上面的handleNewConnectionString方法:

private void handleNewConnectionString(String newConnectionString)
{
    try
    {
        ZooKeeper zooKeeper = this.zooKeeper.getZooKeeper();
        if ( zooKeeper == null )
        {
            //ZK连接还没建立,什么也不做
        }
        else
        {
            if ( ensembleProvider.updateServerListEnabled() )
            {
                //调用ZK原生的API更新地址列表,会引发其他客户端重连
                zooKeeper.updateServerList(newConnectionString);
            }
            else
            {
                //ZK地址发生变化,重置ZK连接
                reset();
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        queueBackgroundException(e);
    }
}

看完了对连接状态的处理以后,回到上面的process方法,它最后会回调parentWatcher.process(event)方法,而该watcher就是我们最一开始初始化的时候传进去的,看下该watcher处理事件的逻辑:

new Watcher()
{
    @Override
    public void process(WatchedEvent watchedEvent)
    {
        //把原生的WatchedEvent事件封装成自己的事件类型
        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
        processEvent(event);
    }
}

private void processEvent(final CuratorEvent curatorEvent)
{
    if ( curatorEvent.getType() == CuratorEventType.WATCHED )
    {
        //广播连接状态变化事件
        validateConnection(curatorEvent.getWatchedEvent().getState());
    }

    //回调listeners
    listeners.forEach(new Function<CuratorListener, Void>()
    {
        @Override
        public Void apply(CuratorListener listener)
        {
            try
            {
                OperationTrace trace = client.startAdvancedTracer("EventListener");
                listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                trace.commit();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                logError("Event listener threw exception", e);
            }
            return null;
        }
    });
}
//这里根据ZK原生的KeeperState事件类型转换成curator自己的ConnectionState事件
//一定要注意这里的ConnectionState是个枚举,跟上面封装ZK连接的ConnectionState不是同一个类,
//只是名字相同
void validateConnection(Watcher.Event.KeeperState state)
{
    if ( state == Watcher.Event.KeeperState.Disconnected )
    {
        internalConnectionHandler.suspendConnection(this);
    }
    else if ( state == Watcher.Event.KeeperState.Expired )
    {
        //广播事件
        connectionStateManager.addStateChange(ConnectionState.LOST);
    }
    else if ( state == Watcher.Event.KeeperState.SyncConnected )
    {
        //如果连接发生变化则广播LOST事件
        internalConnectionHandler.checkNewConnection(this);
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
        unSleepBackgroundOperations();
    }
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
    {
        internalConnectionHandler.checkNewConnection(this);
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
    }
}

public synchronized boolean addStateChange(ConnectionState newConnectionState)
{
    if ( state.get() != State.STARTED )
    {
        return false;
    }

    ConnectionState previousState = currentConnectionState;
    if ( previousState == newConnectionState )
    {
        return false;
    }
    setCurrentConnectionState(newConnectionState);

    ConnectionState localState = newConnectionState;
    boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));
    //如果是初始化连接,则把RECONNECTED状态修改为CONNECTED,经过上面的判断这里就只剩下CONNECTED跟RECONNECTED状态了
    if ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false, true) )
    {
        localState = ConnectionState.CONNECTED;
    }
    //广播事件
    postState(localState);
    return true;
}

private void postState(ConnectionState state)
{
    notifyAll();
    //把ConnectionState放入队列中
    while ( !eventQueue.offer(state) )
    {
        eventQueue.poll();
    }
}

通过上面一系列的代码,我们可以看出一开始传入的watcher就是对事件进行一个转换,修改当前连接状态并把该事件加入到一个队列中。

回到启动ZK连接的地方,我们跳过了connectionStateManager.start()段代码,现在可以来看了:

public void start()
{
    //这里把state设置为STARTED,不要跟CuratorFrameworkImpl弄混
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
    //这里完全就是想要异步执行一下processEvents方法
    service.submit
        (
            new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    processEvents();
                    return null;
                }
            }
        );
}

private void processEvents()
{
    //start方法里已经把该值置为STARTED
    while ( state.get() == State.STARTED )
    {
        try
        {
            //第一次ZK还没有建立连接,这里得到的就是用户指定的会话超时时间
            int useSessionTimeoutMs = getUseSessionTimeoutMs();
            long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
            long pollMaxMs = useSessionTimeoutMs - elapsedMs;

            //这个队列就是刚才放进去事件的队列
            final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
            if ( newState != null )
            {
                if ( listeners.size() == 0 )
                {
                    log.warn("There are no ConnectionStateListeners registered.");
                }
                //这里仅仅就是回调监听器
                listeners.forEach
                    (
                        new Function<ConnectionStateListener, Void>()
                        {
                            @Override
                            public Void apply(ConnectionStateListener listener)
                            {
                                listener.stateChanged(client, newState);
                                return null;
                            }
                        }
                    );
            }
            //该值默认100,如果长时间没有收到事件变化就判断下会话是否过期
            else if ( sessionExpirationPercent > 0 )
            {
                synchronized(this)
                {
                    checkSessionExpiration();
                }
            }
        }
        catch ( InterruptedException e )
        {
        }
    }
}

看完了与ZK建立连接并注册watcher的逻辑,接下来再看下创建ZK节点的逻辑,首先从create方法开始:

public CreateBuilder create()
{
    checkState();
    return new CreateBuilderImpl(this);
}

从create方法可以看出返回一个CreateBuilderImpl对象,使用了建造者模式,因此直接看下forPath的代码:

public String forPath(String path) throws Exception
{
    //这里使用了默认值,即客户端IP
    return forPath(path, client.getDefaultData());
}

public String forPath(final String givenPath, byte[] data) throws Exception
{
    //是否对数据进行压缩
    if ( compress )
    {
        data = client.getCompressionProvider().compress(givenPath, data);
    }

    //构建路径
    final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
    //如果设置了schema的话也加进去
    List<ACL> aclList = acling.getAclList(adjustedPath);
    client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

    String returnPath = null;
    //异步执行
    if ( backgrounding.inBackground() )
    {
        pathInBackground(adjustedPath, data, givenPath);
    }
    else
    {
        //同步执行
        String path = protectedPathInForeground(adjustedPath, data, aclList);
        //这里是返回用户传入的path,比如启动ZK的时候指定了namespace,则在创建的时候会加上
        returnPath = client.unfixForNamespace(path);
    }
    return returnPath;
}

最后来看下同步执行的代码逻辑:

private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception
{
    final AtomicBoolean firstTime = new AtomicBoolean(true);
    //核心逻辑封装在了重试策略里
    String returnPath = RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<String>()
            {
                @Override
                public String call() throws Exception
                {
                    boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;

                    String createdPath = null;
                    //doProtected默认是false,如果开启的话有节点就不去创建了
                    if ( !localFirstTime && doProtected )
                    {
                        debugForceFindProtectedNode = false;
                        createdPath = findProtectedNodeInForeground(path);
                    }

                    if ( createdPath == null )
                    {
                        try
                        {
                            //根据ZK不同版本分情况创建节点
                            if ( client.isZk34CompatibilityMode() )
                            {
                                createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
                            }
                            else
                            {
                                createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            //如果父节点不存在且设置了createParentsIfNeeded属性,则挨个创建父节点
                            if ( createParentsIfNeeded )
                            {
                                ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
                                if ( client.isZk34CompatibilityMode() )
                                {
                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                                }
                                else
                                {
                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
                                }
                            }
                            else
                            {
                                throw e;
                            }
                        }
                        catch ( KeeperException.NodeExistsException e )
                        {
                            //同样的,如果设置了setDataIfExists,则把创建改为更新
                            if ( setDataIfExists )
                            {
                                Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
                                if(storingStat != null)
                                {
                                    DataTree.copyStat(setStat, storingStat);
                                }
                                createdPath = path;
                            }
                            else
                            {
                                throw e;
                            }
                        }
                    }
                    return createdPath;
                }
            }
        );
    return returnPath;
}

重点来看下重试策略的使用:

public static<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
    return client.getConnectionHandlingPolicy().callWithRetry(client, proc);
}

public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
    //阻塞1s等待ZK连接成功
    client.internalBlockUntilConnectedOrTimedOut();

    T result = null;
    //获取重试策略
    RetryLoop retryLoop = client.newRetryLoop();
    //如果需要重试
    while ( retryLoop.shouldContinue() )
    {
        try
        {
            //执行业务逻辑
            result = proc.call();
            //标记成功
            retryLoop.markComplete();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            //处理异常
            retryLoop.takeException(e);
        }
    }

    return result;
}

接下来看下几个重试的关键点,首先是retryLoop.shouldContinue():

//很简单,没有完成就需要重试
public boolean shouldContinue()
{
    return !isDone;
}

由上面就可以推断出retryLoop.markComplete()的逻辑:

public void     markComplete()
{
    isDone = true;
}

可见最关键的应该就是异常处理retryLoop.takeException(e)了:

public void takeException(Exception exception) throws Exception
{
    boolean rethrow = true;
    //只有特定的异常才需要重试
    if ( isRetryException(exception) )
    {
        //重试策略中留给用户实现的允许策略
        if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) )
        {
            //如果允许重试则忽略异常
            rethrow = false;
        }
    }
    //不重试的话抛出异常
    if ( rethrow )
    {
        throw exception;
    }
}

public static boolean isRetryException(Throwable exception)
{
    if ( exception instanceof KeeperException )
    {
        KeeperException keeperException = (KeeperException)exception;
        return shouldRetry(keeperException.code().intValue());
    }
    return false;
}

//连接有问题的异常才需要重试
public static boolean shouldRetry(int rc)
{
    return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
        (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
        (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
        (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
        (rc == -13); //该值是为了兼容
}

从上面的逻辑可以得出一个很重要的结论:
curator针对每次操作ZK都会使用我们指定的重试策略进行包装处理!
这点可能跟我们主观上面的想法不同,比如我自己开始一直认为只有断开连接的时候才需要重试。

三、总结

这里重点分析了curator跟ZK的交互,API只选择了create总结了一下,事实上除了增删改查,很多人经常还会用到类似于NodeCache,TreeCache等高级数据结构,其实这些数据结构都是curator对底层交互API的封装,只要了解了底层的交互逻辑再去看这部分代码就会容易很多了。

上一篇下一篇

猜你喜欢

热点阅读