ZooKeeper实现leader选举

2019-06-14  本文已影响0人  0x70e8

为什么需要leader选举

在多节点的集群中,如分布式缓存、分布式队列等分布式应用,为了保证一致性,一般采用一主多从的的架构,leader负责写请求的处理并将写操作同步到从节点上去写,follow负责读请求的处理,follow接收到写请求也会转发给leader,交由leader来处理。

所以leader的选举和leader崩溃重选是集群服务的核心。zookeeper本身也是这种架构,它采用的是paxos算法的变种zab协议,达到CAP理论的CA两个要求,是一个可靠的分布式应用协调者。

此处不讨论zk自身是如何实现leader选举和集群管理的,它作为一个有用的协调者,如何帮助我们进行我们的application的leader选举呢。

zk提供的leader选举思路

zk有一类node支持序列号,我们可以通过序列号的大小来选择最小序列号的作为leader把ip等信息写入父节点的data中,供外界访问和寻找leader。

另外,如果leader挂了,借助临时node的特性,这个临时node就会被移除,应用可以通过watch事件,重新选择最小的作为leader。

数据的结构大致为:
/parent/
-- node-1
-- node-2
-- node-3

watch同样有两种策略,

  1. watch 父节点
  2. watch 比自己的序列号小1的zNode,避免惊群效应

leader election

public class Election extends Connect implements Watcher {
    private final static Logger LOGGER = LoggerFactory.getLogger(Election.class);
    private final static String nodePrefix = "/node-";
    /*有序节点名*/
    private String path;
    private String root;

    /*启动时进行选举,确认leader*/
    public Election(String address, String root, String name) {
        super(address);
        try {
            this.root = formatRoot(root);
            if (zooKeeper.exists(this.root, false) == null) {
                zooKeeper.create(this.root, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            // 监控父节点
            zooKeeper.getChildren(this.root, this);
            path = zooKeeper.create(this.root + nodePrefix, name.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            LOGGER.info("this node path is :{}", path);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    /*格式化根节点的name*/
    private String formatRoot(String root) {
        return root.startsWith("/") ? root : "/" + root;
    }
    
    /* 父节点变化时,重新进行leader选举*/
    @Override
    public void process(WatchedEvent event) {
        try {
            List<String> children = zooKeeper.getChildren(root, this);
            long miniestNum = findMinima(children);
            String path = root.concat(nodePrefix).concat(StringUtils.leftPad(String.valueOf(miniestNum), 10, "0"));
            Stat stat = zooKeeper.exists(path, false);
            byte[] data = zooKeeper.getData(path, false, stat);
            Stat statRoot = zooKeeper.exists(root, false);
            zooKeeper.setData(root, data, statRoot.getVersion());
            System.out.println(new String(zooKeeper.getData(root, false, statRoot)));
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /*找到最小的序列号*/
    private Long findMinima(List<String> children) {
        List<Long> seqs = new ArrayList<>();
        for (String child : children) {
            seqs.add(getNumber(child));
        }
        Collections.sort(seqs);
        return seqs.get(0);
    }

    /*取nodeName中的数值*/
    private long getNumber(String path) {
        String seq = path.substring(root.length() + nodePrefix.length());
        return Long.parseLong(seq);
    }

   
}

测试

 public static void main(String[] args) {
        Election election = new Election(StaticUtils.ZK_ADDRESS, "myapp", "d");
        // 模拟应用服务中
        LockSupport.park();
    }
上一篇 下一篇

猜你喜欢

热点阅读