ElasticJob故障转移机制

2020-10-06  本文已影响0人  圣村的希望

    在ElasticJob中,会把一个任务分成多个分片,然后再把分片分配给集群中不同的节点实例进行作业任务的执行。但是如果集群中的某几台机器宕机,这些分片任务的执行就需要转移到其它正常节点机器进行继续执行分片任务,这就是任务分片的故障转移。在ElasticJob中有对应的节点故障转移的功能,我们在任务配置的时候配置failover参数即可。下面看下故障转移功能在ElasticJob中的实现流程:

FailoverListenerManager:这是故障转移功能开始的地方
@Override
    public void start() {
        addDataListener(new JobCrashedJobListener());
        addDataListener(new FailoverSettingsChangedJobListener());
    }

     FailoverListenerManager是故障转移监听管理器,在ElasticJob启动的时候,他启动了2个监听器JobCrashedJobListener(集群节点宕机监听器)和FailoverSettingsChangedJobListener(故障转移修改监听器),在集群中的节点发生变化的时候能立即监听到,进而能及时做故障转移操作。

class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            //TODO 当前job运行实例没有关闭、并且config配置开启了故障转移功能、并且是节点删除事件、并且path路径是{jobName}/instances开头的
            if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_DELETED == eventType && instanceNode.isInstancePath(path)) {
                //TODO 获取当前被删除的job节点实例id
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                //TODO 被删除节点实例id是否为当前节点
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                //TODO 获取故障转移到当前节点的分片信息,初始肯定为空
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    //TODO 获取当前删除节点对应的初始分片信息
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        //TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
                        failoverService.setCrashedFailoverFlag(each);
                        //TODO 进行故障转移
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
//TODO 获取故障转移到当前节点的分片信息,初始肯定为空
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);

public List<Integer> getFailoverItems(final String jobInstanceId) {
        //TODO 获取当前{nameSpace}/sharding分片信息
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            //TODO node=[sharding/{item}/failover]
            String node = FailoverNode.getExecutionFailoverNode(item);
            //TODO 查看node节点是否存在,并且job实例id是否为转移到当前节点
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
                result.add(item);
            }
        }
        //TODO 分片做下排序
        Collections.sort(result);
        return result;
    }
//TODO 获取当前删除节点对应的初始分片信息
for (int each : shardingService.getShardingItems(jobInstanceId)) {
    //TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
    failoverService.setCrashedFailoverFlag(each);
    //TODO 进行故障转移
    failoverService.failoverIfNecessary();
}
//TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
failoverService.setCrashedFailoverFlag(each);

public void setCrashedFailoverFlag(final int item) {
        //TODO 查看当前分片是否执行了故障转移 /sharding/{item}/failover
        if (!isFailoverAssigned(item)) {
            //TODO 创建需要故障转移节点标记 /leader/failover/items/{item}
            jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
        }
    }

private boolean isFailoverAssigned(final Integer item) {
        //TODO 判断/sharding/{item}/failover节点是否存在
        return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
    }
public void failoverIfNecessary() {
        //TODO 是否进行故障转移 /leader/failover/items节点存在并且下面存在子节点信息、并且当前job非运行状态
        if (needFailover()) {
            //TODO 获取故障转移分布式锁/leader/failover/latch,在主节点中进行故障转移
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            //TODO 获取节点/leader/failover/items/下的一个子节点信息
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            //TODO 创建临时节点/sharding/{item}/failover ,value为jobInstanceId
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            //TODO 删除/leader/failover/items/{item}节点
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO Instead of using triggerJob, use executor for unified scheduling
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                //TODO 触发下任务调度
                jobScheduleController.triggerJob();
            }
        }
    }

1、首先判断当前分片是否能进行故障转移。判断/leader/failover/items节点存在并且存在子节点信息,并且当前job非运行状态。这里解释了一个疑惑,就是为什么作业执行完之后需要在手动调用一下故障转移操作,就是因为在前一次执行故障转移的时候有分片任务正在执行,导致故障转移操作没有执行,所有在分片任务执行完之后再手动执行下故障转移。
2、获取故障转移分布式锁/leader/failover/latch,获取锁的节点即可往下执行当前分片的故障转移操作。
3、执行故障转移操作是在一个zookeeper事务中执行,它通过LeaderExecutionCallback回调完成故障转移操作,在这个事务中,所有的操作要么都执行要么都不执行。
4、获取节点/leader/failover/items/下的一个子节点信息
5、创建临时节点/sharding/{item}/failover,value为jobInstanceId
6、删除/leader/failover/items/{item}节点
7、手动触发一下任务的调度,防止下次下次周期任务没有及时到来,没有即时执行故障节点的任务执行。

以上是执行了故障节点的分片转移,但是对应的故障分片的任务没有被执行,这个是在任务调度执行的时候触发。
    /**
     * 执行作业.
     */
    public final void execute() {
        //...

        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        
        //...
    }

    @Override
    public ShardingContexts getShardingContexts() {
        //...
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            //获取分配给当前作业实例的故障转移分片,遍历作业的所有分片信息,获取/sharding/{item}/failover
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
    }

    在作业执行流程中获取分片信息的时候,如果开启了故障转移,本次作业的执行,会去优先执行故障转移到当前节点的分片任务。

上一篇下一篇

猜你喜欢

热点阅读