Jetcd 实现主从选举(示例代码)

2024-02-27  本文已影响0人  听歌闭麦开始自闭
@Slf4j
@Component
public class JetcdElectionService implements ApplicationListener<ContextRefreshedEvent> {
    private final Client jetcdClient;
    private final String electionNameText = "/testElection";
    private final String firstNonLoopbackAddress;

    private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>();
    private final AtomicBoolean connectionExceptionFlag = new AtomicBoolean();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public JetcdElectionService(Client jetcdClient, @Value("${server.port}") int port) {
        this.jetcdClient = jetcdClient;
        this.firstNonLoopbackAddress = "127.0.0.1:" + port;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(this::run).start();
    }

    private void run() {
        ByteSequence electionName = this.getElectionName(this.electionNameText);
        ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
        log.info("[Election] 正在执行选举 [electionName: {}, proposal: {}]...", electionName.toString(), proposal.toString());

        Election electionClient = this.jetcdClient.getElectionClient();
        electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));

        LeaderResponse leader = null;
        try {
            leader = electionClient.leader(electionName).get(); // etcd中没有leader会报错
        } catch (Exception ignored) {
        }

        if (leader == null) {
            log.info("[Election] 检测到leader不存在,当前实例正在尝试参选...");
            this.doElect(electionName, proposal);
        }
    }

    private boolean doElect(ByteSequence electionName, ByteSequence proposal) {
        Lease leaseClient = this.jetcdClient.getLeaseClient();
        Election electionClient = this.jetcdClient.getElectionClient();

        LeaseGrantResponse lease;
        try {
            lease = leaseClient.grant(15).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("[Election] 选举时发生异常,无法获得租约!", e);
            return false;
        }
        long leaseID = lease.getID();

        try {
            electionClient.campaign(electionName, leaseID, proposal).get(5, TimeUnit.SECONDS);

            // leader选举后进行自动续约,请求发送周期为: lease.ttl / 3
            leaseClient.keepAlive(leaseID, new LeaseKeepaliveObserver());
            log.info("[Election] 选举完成,当前实例当选成功!");
            return true;
        } catch (InterruptedException | ExecutionException e) {
            log.error("[Election] 选举时发生异常,无法获得租约!", e);
            return false;
        } catch (TimeoutException e) {
            log.info("[Election] 当前实例未能选中,停止参选中.");
            return false;
        }
    }

    private void handleLeaderResponse(LeaderResponse response) {
        Watch watchClient = this.jetcdClient.getWatchClient();

        KeyValue kv = response.getKv();
        ByteSequence proposalKey = kv.getKey(); // ${electionName}/${随机字符串}
        log.info("[Election] 选举完成,当选实例信息[key: {}, value: {}]", proposalKey.toString(), kv.getValue().toString());

        Watch.Watcher oldWatcher = this.watcher.get();
        if (oldWatcher != null) {
            oldWatcher.close();
        }

        Watch.Watcher newWatcher = watchClient.watch(proposalKey, new LeaderWatchListener(this::handleLeaderChange, this::handleWatchError, this.executor));
        this.watcher.compareAndSet(oldWatcher, newWatcher);
    }

    private void handleLeaderChange(WatchResponse response) {
        ByteSequence electionName = this.getElectionName(this.electionNameText);
        ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
        Election electionClient = this.jetcdClient.getElectionClient();

        List<WatchEvent> events = response.getEvents();
        WatchEvent watchEvent = null;
        if (events != null && !events.isEmpty()) {
            watchEvent = events.get(0);
        }
        WatchEvent.EventType eventType = Optional.ofNullable(watchEvent).map(WatchEvent::getEventType).orElse(null);

        if (this.connectionExceptionFlag.get()) {
            log.info("[Election] [{}] 检测到与etcd连接异常,重新注册observe服务.", eventType);

            electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
            this.connectionExceptionFlag.compareAndSet(true, false);
        } else {
            log.info("[Election] [{}] 检测到leader变动事件,当前实例正在尝试参选...", eventType);
        }
        this.doElect(electionName, proposal); 
    }

    private void handleWatchError(Throwable throwable) {
        // 发现和etcd连接出现了异常
        this.connectionExceptionFlag.compareAndSet(false, true);
    }

    public ByteSequence getElectionName(String electionNameText) {
        return ByteSequence.from(electionNameText, StandardCharsets.UTF_8);
    }

    public ByteSequence getProposal(String firstNonLoopbackAddress) {
        return ByteSequence.from(firstNonLoopbackAddress, StandardCharsets.UTF_8);
    }


    private static class LeaderElectionListener implements Election.Listener {
        private final Consumer<LeaderResponse> leaderResponseConsumer;
        private final ExecutorService executor;

        public LeaderElectionListener(Consumer<LeaderResponse> leaderResponseConsumer, ExecutorService executor) {
            this.leaderResponseConsumer = leaderResponseConsumer;
            this.executor = executor;
        }

        @Override
        public void onNext(LeaderResponse response) {
            CompletableFuture.runAsync(() -> this.leaderResponseConsumer.accept(response), this.executor).exceptionally((e) -> {
                if (e != null) {
                    e.printStackTrace();
                }
                return null;
            });
        }

        @Override
        public void onError(Throwable throwable) {
            log.error(throwable.getMessage(), new RuntimeException(throwable));
        }

        @Override
        public void onCompleted() {
        }
    }

    private static class LeaderWatchListener implements Watch.Listener {
        private final Consumer<WatchResponse> leaderChangeConsumer;
        private final Consumer<Throwable> onErrorConsumer;
        private final ExecutorService executor;

        public LeaderWatchListener(Consumer<WatchResponse> leaderChangeConsumer, Consumer<Throwable> onErrorConsumer, ExecutorService executor) {
            this.leaderChangeConsumer = leaderChangeConsumer;
            this.onErrorConsumer = onErrorConsumer;
            this.executor = executor;
        }

        @Override
        public void onNext(WatchResponse response) {
            CompletableFuture.runAsync(() -> this.leaderChangeConsumer.accept(response), this.executor).exceptionally((e) -> {
                if (e != null) {
                    log.error(e.getMessage(), e);
                }
                return null;
            });
        }

        @Override
        public void onError(Throwable throwable) {
            RuntimeException t = new RuntimeException(throwable);
            log.error(throwable.getMessage(), t);
            this.onErrorConsumer.accept(t);
        }

        @Override
        public void onCompleted() {
        }
    }

    private static class LeaseKeepaliveObserver implements StreamObserver<LeaseKeepAliveResponse> {

        @Override
        public void onNext(LeaseKeepAliveResponse value) {
        }

        @Override
        public void onError(Throwable throwable) {
            log.error(throwable.getMessage(), new RuntimeException(throwable));
        }

        @Override
        public void onCompleted() {
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读