原创-zookeeper集群优先选择连接节点改造

2019-05-28  本文已影响0人  无色的叶

背景

zookeeper集群跨机房部署,采用observer部署模式,达到目标如下:

问题

zk集群是跨机房部署,会存在部署在A机房的应用连接到B机房的zk节点,如果网络出现抖动会对应用出现较大影响,所以希望部署在A机房的应用连接到A机房的zk节点,且当A机房所有zk节点挂掉后,能够连接B机房zk节点

实现zk节点优先连接思路

查看zk创建连接源码过程分析,创建zookeeper对象


image.png

zookeeper对象的构造函数中,会创建StaticHostProvider对象

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout
                + " watcher=" + watcher
                + " sessionId=" + Long.toHexString(sessionId)
                + " sessionPasswd="
                + (sessionPasswd == null ? "<null>" : "<hidden>"));

        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
        cnxn.seenRwServerBefore = true; // since user has provided sessionId
        cnxn.start();
    }

进一步查看StaticHostProvider构造函数,发现会调用init方法

/**
     * Constructs a SimpleHostSet.
     *
     * @param serverAddresses
     *            possibly unresolved ZooKeeper server addresses
     * @throws IllegalArgumentException
     *             if serverAddresses is empty or resolves to an empty list
     */
    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
        this.resolver = new Resolver() {
            @Override
            public InetAddress[] getAllByName(String name) throws UnknownHostException {
                return InetAddress.getAllByName(name);
            }
        };
        init(serverAddresses);
    }

    /**
     * Introduced for testing purposes. getAllByName() is a static method of InetAddress, therefore cannot be easily mocked.
     * By abstraction of Resolver interface we can easily inject a mocked implementation in tests.
     *
     * @param serverAddresses
     *            possibly unresolved ZooKeeper server addresses
     * @param resolver
     *            custom resolver implementation
     * @throws IllegalArgumentException
     *             if serverAddresses is empty or resolves to an empty list
     */
    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, Resolver resolver) {
        this.resolver = resolver;
        init(serverAddresses);
    }

 /**
     * Common init method for all constructors.
     * Resolve all unresolved server addresses, put them in a list and shuffle.
     */
    private void init(Collection<InetSocketAddress> serverAddresses) {
        if (serverAddresses.isEmpty()) {
            throw new IllegalArgumentException(
                    "A HostProvider may not be empty!");
        }

        this.serverAddresses.addAll(serverAddresses);
        Collections.shuffle(this.serverAddresses);
    }

init方法比较简单,就是把传入的zookeeper连接串集合加入serverAddresses集合中,并进行随机打乱
回到zookeeper的构造函数中,创建ClientCnxn对象,并调用start方法

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
        cnxn.seenRwServerBefore = true; // since user has provided sessionId
        cnxn.start();

进一步深入ClientCnxn类start方法,调用了sendThread.start();方法

 public void start() {
        sendThread.start();
        eventThread.start();
    }

进而调用了SendThread的run方法,该方法中存在一个从serverAddresses集合中获取zk连接地址,并创建连接,见方法 hostProvider.next(1000);

 @Override
        public void run() {
            clientCnxnSocket.introduce(this,sessionId);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;
            while (state.isAlive()) {
                try {
                    if (!clientCnxnSocket.isConnected()) {
                        if(!isFirstConnect){
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

这时可看到最终是调用StaticHostProvider类的next方法获取zk连接串,先获取serverAddresses集合第一个元素,如连接失败或超时,再获取第二个元素,以此类推,而serverAddresses集合是在init方法中进行的初始化且进行随机打散

public InetSocketAddress next(long spinDelay) {
        currentIndex = ++currentIndex % serverAddresses.size();
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }

        InetSocketAddress curAddr = serverAddresses.get(currentIndex);
        try {
            String curHostString = getHostString(curAddr);
            List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
            if (resolvedAddresses.isEmpty()) {
                return curAddr;
            }
            Collections.shuffle(resolvedAddresses);
            return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
        } catch (UnknownHostException e) {
            return curAddr;
        }
    }

根据以上分析,如要达到zk优先机房连接节点目的,可对StaticHostProvider的init方法进行改造,以优先连接A机房zk节点为例,思路如下:

即先把A机房zk节点ip加入到list集合中并进行随机打乱,再把B机房zk节点ip随机打乱追加到list集合中,从而得到A机房zk节点在前,B机房zk节点在后的一个有序集合,从而达到优先连接zk机房zk节点目的,代码如下:

 /**
     * Common init method for all constructors.
     * Resolve all unresolved server addresses, put them in a list and shuffle.
     */
    private void init(Collection<InetSocketAddress> serverAddresses) {
        //初始化observer zk ip
        String zkHome = System.getenv(ZOOKEEPER_HOME);
        Properties properties = new Properties();
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new FileReader(zkHome + "/conf/zoo.cfg"));
            properties.load(bufferedReader);

            String lowPriorityConnectIPs = properties.getProperty(LOW_PRIORITY_CONNECT_IPS);
            if (lowPriorityConnectIPs != null && lowPriorityConnectIPs != "") {
                String[] split = lowPriorityConnectIPs.split(":");
                observerZks.addAll(Arrays.asList(split));
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (serverAddresses.isEmpty()) {
            throw new IllegalArgumentException(
                    "A HostProvider may not be empty!");
        }
        Iterator<InetSocketAddress> iterator = serverAddresses.iterator();
        while (iterator.hasNext()) {
            InetSocketAddress inetSocketAddress = iterator.next();
            String hostString = getHostString(inetSocketAddress);
            String hostIp = null;
            try {
                hostIp = InetAddress.getByName(hostString).getHostAddress();
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }

            if (hostIp != null) {
                boolean contains = observerZks.contains(hostIp);
                if (contains) {
                    observerZkInetSockets.add(inetSocketAddress);
                    iterator.remove();
                }
            }
        }

        this.serverAddresses.addAll(serverAddresses);
        Collections.shuffle(this.serverAddresses);
        Collections.shuffle(this.observerZkInetSockets);
        this.serverAddresses.addAll(observerZkInetSockets);
        if (bufferedReader != null) {
            try {
                bufferedReader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

上一篇 下一篇

猜你喜欢

热点阅读