原创-zookeeper集群优先选择连接节点改造
背景
zookeeper集群跨机房部署,采用observer部署模式,达到目标如下:
- A机房zk节点参与选举
- B机房zk节点只负责同步节点信息,不参与选举
-
A机房zk节点全挂掉后,切换到B机房zk集群,zk数据不会丢失
image.png
问题
zk集群是跨机房部署,会存在部署在A机房的应用连接到B机房的zk节点,如果网络出现抖动会对应用出现较大影响,所以希望部署在A机房的应用连接到A机房的zk节点,且当A机房所有zk节点挂掉后,能够连接B机房zk节点
实现zk节点优先连接思路
查看zk创建连接源码过程分析,创建zookeeper对象
image.png
zookeeper对象的构造函数中,会创建StaticHostProvider对象
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout
+ " watcher=" + watcher
+ " sessionId=" + Long.toHexString(sessionId)
+ " sessionPasswd="
+ (sessionPasswd == null ? "<null>" : "<hidden>"));
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();
}
进一步查看StaticHostProvider构造函数,发现会调用init方法
/**
* Constructs a SimpleHostSet.
*
* @param serverAddresses
* possibly unresolved ZooKeeper server addresses
* @throws IllegalArgumentException
* if serverAddresses is empty or resolves to an empty list
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
this.resolver = new Resolver() {
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
};
init(serverAddresses);
}
/**
* Introduced for testing purposes. getAllByName() is a static method of InetAddress, therefore cannot be easily mocked.
* By abstraction of Resolver interface we can easily inject a mocked implementation in tests.
*
* @param serverAddresses
* possibly unresolved ZooKeeper server addresses
* @param resolver
* custom resolver implementation
* @throws IllegalArgumentException
* if serverAddresses is empty or resolves to an empty list
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, Resolver resolver) {
this.resolver = resolver;
init(serverAddresses);
}
/**
* Common init method for all constructors.
* Resolve all unresolved server addresses, put them in a list and shuffle.
*/
private void init(Collection<InetSocketAddress> serverAddresses) {
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
this.serverAddresses.addAll(serverAddresses);
Collections.shuffle(this.serverAddresses);
}
init方法比较简单,就是把传入的zookeeper连接串集合加入serverAddresses集合中,并进行随机打乱
回到zookeeper的构造函数中,创建ClientCnxn对象,并调用start方法
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();
进一步深入ClientCnxn类start方法,调用了sendThread.start();方法
public void start() {
sendThread.start();
eventThread.start();
}
进而调用了SendThread的run方法,该方法中存在一个从serverAddresses集合中获取zk连接地址,并创建连接,见方法 hostProvider.next(1000);
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
这时可看到最终是调用StaticHostProvider类的next方法获取zk连接串,先获取serverAddresses集合第一个元素,如连接失败或超时,再获取第二个元素,以此类推,而serverAddresses集合是在init方法中进行的初始化且进行随机打散
public InetSocketAddress next(long spinDelay) {
currentIndex = ++currentIndex % serverAddresses.size();
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
InetSocketAddress curAddr = serverAddresses.get(currentIndex);
try {
String curHostString = getHostString(curAddr);
List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
if (resolvedAddresses.isEmpty()) {
return curAddr;
}
Collections.shuffle(resolvedAddresses);
return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
} catch (UnknownHostException e) {
return curAddr;
}
}
根据以上分析,如要达到zk优先机房连接节点目的,可对StaticHostProvider的init方法进行改造,以优先连接A机房zk节点为例,思路如下:
- 在zoo.cfg配置文件中增加属性low.priority.connect.ips配置B机房的zk节点
- 从client传入的连接串中移除low.priority.connect.ips属性配置的ip,并记录到单独集合中
- 再加入到StaticHostProvider类的serverAddresses集合中
- 执行Collections.shuffle(this.serverAddresses);
- 再把移除记录的单独集合追加到StaticHostProvider类的serverAddresses集合末尾
即先把A机房zk节点ip加入到list集合中并进行随机打乱,再把B机房zk节点ip随机打乱追加到list集合中,从而得到A机房zk节点在前,B机房zk节点在后的一个有序集合,从而达到优先连接zk机房zk节点目的,代码如下:
/**
* Common init method for all constructors.
* Resolve all unresolved server addresses, put them in a list and shuffle.
*/
private void init(Collection<InetSocketAddress> serverAddresses) {
//初始化observer zk ip
String zkHome = System.getenv(ZOOKEEPER_HOME);
Properties properties = new Properties();
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new FileReader(zkHome + "/conf/zoo.cfg"));
properties.load(bufferedReader);
String lowPriorityConnectIPs = properties.getProperty(LOW_PRIORITY_CONNECT_IPS);
if (lowPriorityConnectIPs != null && lowPriorityConnectIPs != "") {
String[] split = lowPriorityConnectIPs.split(":");
observerZks.addAll(Arrays.asList(split));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
Iterator<InetSocketAddress> iterator = serverAddresses.iterator();
while (iterator.hasNext()) {
InetSocketAddress inetSocketAddress = iterator.next();
String hostString = getHostString(inetSocketAddress);
String hostIp = null;
try {
hostIp = InetAddress.getByName(hostString).getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
if (hostIp != null) {
boolean contains = observerZks.contains(hostIp);
if (contains) {
observerZkInetSockets.add(inetSocketAddress);
iterator.remove();
}
}
}
this.serverAddresses.addAll(serverAddresses);
Collections.shuffle(this.serverAddresses);
Collections.shuffle(this.observerZkInetSockets);
this.serverAddresses.addAll(observerZkInetSockets);
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}