zookeeper源码分析之curator客户端
curator是对zookeeper原生客户端的一个封装,让我们使用起来更加方便。本文针对它的工作原理做一个总结,由于可能需要对zookeeper原生客户端的使用方式有一些了解才能更好的理解本文,因此建议先看下zookeeper源码分析之客户端源码解密。
一、执行过程概述
先来看一段常见的使用curator连接ZK并创建节点的代码:
//初始化客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(3 * 1000)
.retryPolicy(new ExponentialBackoffRetry(1000, 29, 60 * 1000))
.build();
//开始连接ZK集群
client.start();
//阻塞等待连接ZK集群
boolean success = client.blockUntilConnected(5, SECONDS);
if (success){
//创建ZK节点
client.create().creatingParentsIfNeeded().forPath("test", "test".getBytes("UTF-8"));
}
从这个Demo看出,使用curator需要先创建实例,然后使用start方法开启ZK连接,最后才是执行增删改查操作,下面顺着这个思路一起来看下源码吧。
二、源码分析
先来看下初始化CuratorFramework的源码:
//最后的build就是返回一个CuratorFrameworkImpl对象
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
//看下CuratorFrameworkImpl的构造函数
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
//zk初始化工厂,默认的ZK工厂为DefaultZookeeperFactory,就是调用zk原生的客户端new Zookeeper(...)
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//注意这里client的类型
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
//这个watcher收到ZK原生的事件以后封装成curator的事件类型CuratorEvent然后进行处理
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//zk连接状态的管理类,状态发生变化时回调listener的逻辑就在里面
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
//ZK节点默认值为本机IP,ZK本身是不允许创建没有value的节点的,但curator允许,就是使用了该默认值
byte[] builderDefaultData = builder.getDefaultData();
//为了便于理解,省略了大量其他变量的初始化
}
//上面调用的CuratorZookeeperClient的构造函数
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
//封装了ZK客户端,传入的watcher也继续传下去
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
//继续省略大量初始化其他变量代码
}
//ConnectionState的构造函数
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
//ensembleProvider是用来获取ZK地址的类,以zk地址字符串作为参数会封装成FixedEnsembleProvider类型
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
this.connectionHandlingPolicy = connectionHandlingPolicy;
//这里的parentWatcher就是由CuratorFrameworkImpl的构造函数传进来的
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
}
//从这个this可以看出来该类继承了ZK中的watcher,也就是说该类本质上就是一个watcher
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
//最后一个构造函数什么也不做,只是赋值
HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly)
{
this.zookeeperFactory = zookeeperFactory;
this.watcher = watcher;
this.ensembleProvider = ensembleProvider;
this.sessionTimeout = sessionTimeout;
this.canBeReadOnly = canBeReadOnly;
}
从上面一系列的初始化过程来看:
- CuratorFrameworkImpl对外暴露的是操作ZK的API,例如增删改查,持有的客户端引用是CuratorZookeeperClient类型。
- CuratorZookeeperClient提供的API较少,它持有客户端引用是ConnectionState类型。
- ConnectionState本身实现了zookeeper中的watcher接口,它负责处理zookeeper事件,例如会话超时,它持有的客户端引用是HandleHolder类型。
- HandleHolder里面封装了连接以及关闭ZK的逻辑。
完成变量初始化以后想真正连接ZK就要调用CuratorFramework类中的start方法,看下源码,同样还是省略了部分代码:
public void start()
{
//这个state是CuratorFrameworkImpl中表示自己状态的,别弄混
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
//看类名可以知道这是启动连接状态的管理线程
connectionStateManager.start();
//这里调用的是CuratorZookeeperClient中的start方法,真正与ZK建立连接
client.start();
//开启一个异步线程去执行一些背后的任务
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
backgroundOperationsLoop();
return null;
}
});
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
通过之前zookeeper原生API的源码可以知道,建立ZK连接的时候需要注册一个watcher来处理ZK返回的连接状态跟节点状态的变化的事件。上面的源码虽然先启动了connectionStateManager类,但我们这里还是先看下CuratorZookeeperClient的start方法:
public void start() throws Exception
{
//这里又一个类似的started变量,它是CuratorZookeeperClient类自己的,别弄混
if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}
//这里就是调用的ConnectionState的start方法
state.start();
}
//ConnectionState的start方法
void start() throws Exception
{
//这个是在获取ZK地址前做一些操作,默认为空
ensembleProvider.start();
reset();
}
synchronized void reset() throws Exception
{
log.debug("reset");
//用来记录zookeeper实例被构建了多少次,通过这个计数可以区分状态没有变化时连接有没有发生改变,例如连接秒断开就秒连上。
instanceIndex.incrementAndGet();
//连接状态置为false
isConnected.set(false);
//连接开始时间
connectionStartMs = System.currentTimeMillis();
//关闭老的zookeeper实例,重新构建新的helper
zooKeeper.closeAndReset();
//连接ZK
zooKeeper.getZooKeeper(); // initiate connection
}
//上面调用的HandleHolder类中的方法
void closeAndReset() throws Exception
{
//如果有的话关闭之前的zookeeper实例,重构构建helper
internalClose(0);
//这里初始化了helper对象
helper = new Helper()
{
private volatile ZooKeeper zooKeeperHandle = null;
private volatile String connectionString = null;
@Override
public ZooKeeper getZooKeeper() throws Exception
{
synchronized(this)
{
if ( zooKeeperHandle == null )
{
connectionString = ensembleProvider.getConnectionString();
//这里传入的watcher就是ConnectionState
zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
}
//连接ZK以后修改helper的实现,后面读取不用再加锁
//这个思路有点怪,但是合理
helper = new Helper()
{
@Override
public ZooKeeper getZooKeeper() throws Exception
{
return zooKeeperHandle;
}
@Override
public String getConnectionString()
{
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs()
{
return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
}
};
return zooKeeperHandle;
}
}
@Override
public String getConnectionString()
{
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs()
{
return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
}
};
}
//通过上面的helper实现知道这里真正与ZK建立连接
ZooKeeper getZooKeeper() throws Exception
{
return (helper != null) ? helper.getZooKeeper() : null;
}
上面的代码成功与ZK集群建立了连接,顺着思路,接下来应该通过watcher接收ZK返回的连接事件并进行处理,而这里的watcher就是ConnectionState类,看下它处理事件的源码,直接看它实现的Watcher接口的process方法即可:
public void process(WatchedEvent event)
{
//这里为None说明收到的事件是ZK连接状态改变的事件
if ( event.getType() == Watcher.Event.EventType.None )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
//连接状态发生变化
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
//记录连接开始时间
connectionStartMs = System.currentTimeMillis();
//连接状态变化为已连接则记录新协商的会话超时时间
if ( newIsConnected )
{
lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
}
}
}
//回调curator自己的watcher,注意这个parentWatchers在初始化的时候就传进来一个
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
}
下面先来看下关键的对于连接状态的处理,就是上面的checkState方法源码:
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
{
isConnected = false;
break;
}
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
case AuthFailed:
{
isConnected = false;
break;
}
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
//处理会话过期,里面其实就是重新构建ZK连接
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
break;
}
}
//当连接状态发生改变且不是会话过期时,检查ZK地址是否发生变化
if ( checkNewConnectionString )
{
String newConnectionString = zooKeeper.getNewConnectionString();
if ( newConnectionString != null )
{
//处理ZK地址发生变化
handleNewConnectionString(newConnectionString);
}
}
return isConnected;
}
处理会话过期的逻辑很简单,就是销毁当前ZK实例,重建一个新的ZK实例,如下:
private void handleExpiredSession()
{
try
{
//session超时会重建ZK连接,该方法上面已经贴过,此处省略
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
//重连ZK失败,加入异常队列处理
queueBackgroundException(e);
}
}
再来看下ZK地址发生变化的情况,即上面的handleNewConnectionString方法:
private void handleNewConnectionString(String newConnectionString)
{
try
{
ZooKeeper zooKeeper = this.zooKeeper.getZooKeeper();
if ( zooKeeper == null )
{
//ZK连接还没建立,什么也不做
}
else
{
if ( ensembleProvider.updateServerListEnabled() )
{
//调用ZK原生的API更新地址列表,会引发其他客户端重连
zooKeeper.updateServerList(newConnectionString);
}
else
{
//ZK地址发生变化,重置ZK连接
reset();
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
queueBackgroundException(e);
}
}
看完了对连接状态的处理以后,回到上面的process方法,它最后会回调parentWatcher.process(event)方法,而该watcher就是我们最一开始初始化的时候传进去的,看下该watcher处理事件的逻辑:
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
//把原生的WatchedEvent事件封装成自己的事件类型
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
}
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
//广播连接状态变化事件
validateConnection(curatorEvent.getWatchedEvent().getState());
}
//回调listeners
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
OperationTrace trace = client.startAdvancedTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
//这里根据ZK原生的KeeperState事件类型转换成curator自己的ConnectionState事件
//一定要注意这里的ConnectionState是个枚举,跟上面封装ZK连接的ConnectionState不是同一个类,
//只是名字相同
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
//广播事件
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
//如果连接发生变化则广播LOST事件
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
public synchronized boolean addStateChange(ConnectionState newConnectionState)
{
if ( state.get() != State.STARTED )
{
return false;
}
ConnectionState previousState = currentConnectionState;
if ( previousState == newConnectionState )
{
return false;
}
setCurrentConnectionState(newConnectionState);
ConnectionState localState = newConnectionState;
boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));
//如果是初始化连接,则把RECONNECTED状态修改为CONNECTED,经过上面的判断这里就只剩下CONNECTED跟RECONNECTED状态了
if ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false, true) )
{
localState = ConnectionState.CONNECTED;
}
//广播事件
postState(localState);
return true;
}
private void postState(ConnectionState state)
{
notifyAll();
//把ConnectionState放入队列中
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
}
}
通过上面一系列的代码,我们可以看出一开始传入的watcher就是对事件进行一个转换,修改当前连接状态并把该事件加入到一个队列中。
回到启动ZK连接的地方,我们跳过了connectionStateManager.start()段代码,现在可以来看了:
public void start()
{
//这里把state设置为STARTED,不要跟CuratorFrameworkImpl弄混
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
//这里完全就是想要异步执行一下processEvents方法
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
private void processEvents()
{
//start方法里已经把该值置为STARTED
while ( state.get() == State.STARTED )
{
try
{
//第一次ZK还没有建立连接,这里得到的就是用户指定的会话超时时间
int useSessionTimeoutMs = getUseSessionTimeoutMs();
long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
long pollMaxMs = useSessionTimeoutMs - elapsedMs;
//这个队列就是刚才放进去事件的队列
final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
if ( newState != null )
{
if ( listeners.size() == 0 )
{
log.warn("There are no ConnectionStateListeners registered.");
}
//这里仅仅就是回调监听器
listeners.forEach
(
new Function<ConnectionStateListener, Void>()
{
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
}
//该值默认100,如果长时间没有收到事件变化就判断下会话是否过期
else if ( sessionExpirationPercent > 0 )
{
synchronized(this)
{
checkSessionExpiration();
}
}
}
catch ( InterruptedException e )
{
}
}
}
看完了与ZK建立连接并注册watcher的逻辑,接下来再看下创建ZK节点的逻辑,首先从create方法开始:
public CreateBuilder create()
{
checkState();
return new CreateBuilderImpl(this);
}
从create方法可以看出返回一个CreateBuilderImpl对象,使用了建造者模式,因此直接看下forPath的代码:
public String forPath(String path) throws Exception
{
//这里使用了默认值,即客户端IP
return forPath(path, client.getDefaultData());
}
public String forPath(final String givenPath, byte[] data) throws Exception
{
//是否对数据进行压缩
if ( compress )
{
data = client.getCompressionProvider().compress(givenPath, data);
}
//构建路径
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
//如果设置了schema的话也加进去
List<ACL> aclList = acling.getAclList(adjustedPath);
client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null;
//异步执行
if ( backgrounding.inBackground() )
{
pathInBackground(adjustedPath, data, givenPath);
}
else
{
//同步执行
String path = protectedPathInForeground(adjustedPath, data, aclList);
//这里是返回用户传入的path,比如启动ZK的时候指定了namespace,则在创建的时候会加上
returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
最后来看下同步执行的代码逻辑:
private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception
{
final AtomicBoolean firstTime = new AtomicBoolean(true);
//核心逻辑封装在了重试策略里
String returnPath = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<String>()
{
@Override
public String call() throws Exception
{
boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
String createdPath = null;
//doProtected默认是false,如果开启的话有节点就不去创建了
if ( !localFirstTime && doProtected )
{
debugForceFindProtectedNode = false;
createdPath = findProtectedNodeInForeground(path);
}
if ( createdPath == null )
{
try
{
//根据ZK不同版本分情况创建节点
if ( client.isZk34CompatibilityMode() )
{
createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
}
else
{
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
}
}
catch ( KeeperException.NoNodeException e )
{
//如果父节点不存在且设置了createParentsIfNeeded属性,则挨个创建父节点
if ( createParentsIfNeeded )
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
if ( client.isZk34CompatibilityMode() )
{
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
else
{
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
}
}
else
{
throw e;
}
}
catch ( KeeperException.NodeExistsException e )
{
//同样的,如果设置了setDataIfExists,则把创建改为更新
if ( setDataIfExists )
{
Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
if(storingStat != null)
{
DataTree.copyStat(setStat, storingStat);
}
createdPath = path;
}
else
{
throw e;
}
}
}
return createdPath;
}
}
);
return returnPath;
}
重点来看下重试策略的使用:
public static<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
return client.getConnectionHandlingPolicy().callWithRetry(client, proc);
}
public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
//阻塞1s等待ZK连接成功
client.internalBlockUntilConnectedOrTimedOut();
T result = null;
//获取重试策略
RetryLoop retryLoop = client.newRetryLoop();
//如果需要重试
while ( retryLoop.shouldContinue() )
{
try
{
//执行业务逻辑
result = proc.call();
//标记成功
retryLoop.markComplete();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
//处理异常
retryLoop.takeException(e);
}
}
return result;
}
接下来看下几个重试的关键点,首先是retryLoop.shouldContinue():
//很简单,没有完成就需要重试
public boolean shouldContinue()
{
return !isDone;
}
由上面就可以推断出retryLoop.markComplete()的逻辑:
public void markComplete()
{
isDone = true;
}
可见最关键的应该就是异常处理retryLoop.takeException(e)了:
public void takeException(Exception exception) throws Exception
{
boolean rethrow = true;
//只有特定的异常才需要重试
if ( isRetryException(exception) )
{
//重试策略中留给用户实现的允许策略
if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) )
{
//如果允许重试则忽略异常
rethrow = false;
}
}
//不重试的话抛出异常
if ( rethrow )
{
throw exception;
}
}
public static boolean isRetryException(Throwable exception)
{
if ( exception instanceof KeeperException )
{
KeeperException keeperException = (KeeperException)exception;
return shouldRetry(keeperException.code().intValue());
}
return false;
}
//连接有问题的异常才需要重试
public static boolean shouldRetry(int rc)
{
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
(rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
(rc == -13); //该值是为了兼容
}
从上面的逻辑可以得出一个很重要的结论:
curator针对每次操作ZK都会使用我们指定的重试策略进行包装处理!
这点可能跟我们主观上面的想法不同,比如我自己开始一直认为只有断开连接的时候才需要重试。
三、总结
这里重点分析了curator跟ZK的交互,API只选择了create总结了一下,事实上除了增删改查,很多人经常还会用到类似于NodeCache,TreeCache等高级数据结构,其实这些数据结构都是curator对底层交互API的封装,只要了解了底层的交互逻辑再去看这部分代码就会容易很多了。