分布式定时任务elastic-job(三)

2021-05-30  本文已影响0人  后来丶_a24d

目录

目录.png

分布式定时任务系列


选举

// LeaderService.java
// 选举主节点
public void electLeader() {
   // 通过回调的机制实现抽象公共代码部分
   jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
}

// JobNodeStorage.java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
   // try with resource, LeaderLatch实现closable, 完成后会自动调用close
   try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
       latch.start();
       latch.await();
       callback.execute();
   } catch (final Exception ex) {
       handleException(ex);
   }
}

// LeaderElectionExecutionCallback 回调
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
   @Override
   public void execute() {
       // 当前无主节点
       if (!hasLeader()) { 
           jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
       }
   }
}

选主时机

public void registerStartUpInfo(final boolean enabled) {
   // 选举主节点
   leaderService.electLeader();
}
class LeaderElectionJobListener extends AbstractJobListener {
   
   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
           leaderService.electLeader();
       }
   }
}

等待主节点选举完成

// LeaderService
public boolean isLeaderUntilBlock() {
   // 不存在主节点 && 有可用的服务器节点
   while (!hasLeader() && serverService.hasAvailableServers()) {
       log.info("Leader is electing, waiting for {} ms", 100);
       // 等待
       BlockUtils.waitingShortTime();
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { 
          // 当前服务器节点可用
           electLeader();
       }
   }
   // 返回当前节点是否是主节点
   return isLeader();
}

zk节点数据结构

config
servers
leader
instances
sharding

分片

分片策略

public interface JobShardingStrategy {
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}
默认策略AverageAllocationJobShardingStrategy
  1. 分成9片,则每台作业节点分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
  2. 分成8片,则每台作业节点分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
OdevitySortByNameJobShardingStrategy
  1. 如果有3台作业节点, 分成2片, 作业名称的哈希值为奇数, 则每台作业节点分到的分片是: 1=[ ], 2=[1], 3=[0].
  2. 如果有3台作业节点, 分成2片, 作业名称的哈希值为偶数, 则每台作业节点分到的分片是: 1=[0], 2=[1], 3=[ ]
RotateServerByNameJobShardingStrategy
  1. 如果有3台作业节点,分成2片,作业名称的哈希值为奇数,则每台作业节点分到的分片是:1=[0], 2=[1], 3=[]
  2. 如果有3台作业节点,分成2片,作业名称的哈希值为偶数,则每台作业节点分到的分片是:3=[0], 2=[1], 1=[]
自定义实现

分片条件

设置重新分片代码
// ShardingService.java
// 设置需要重新分片的标记.
public void setReshardingFlag() {
   jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}

// JobNodeStorage.java
// 如果存在则创建作业节点
public void createJobNodeIfNeeded(final String node) {
   if (isJobRootNodeExisted() && !isJobNodeExisted(node)) {
       regCenter.persist(jobNodePath.getFullPath(node), "");
   }
}
// ShardingService.java
// 判断是否需要重分片 判断/${JOB_NAME}/leader/sharding/necessary节点存不存在
public boolean isNeedSharding() {
   return jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY);
}
需要重新分片的情况
  1. 作业启动时
// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // 设置 需要重新分片的标记
   shardingService.setReshardingFlag();
}
  1. 分片总数变化时
// ShardingTotalCountChangedJobListener
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
    
    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
        if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
            int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
            if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                // 分片总数变化时设置需要重新分片
                shardingService.setReshardingFlag();
                JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
            }
        }
    }
}
  1. 服务器变化,服务器变化时触发服务器变化监听器
// ShardingListenerManager.java
class ListenServersChangedJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && (isInstanceChange(eventType, path)
                   || isServerChange(path))) {
           // 设置需要重新分片
           shardingService.setReshardingFlag();
       }
   }
   
   private boolean isInstanceChange(final Type eventType, final String path) {
       return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
   }
   
   private boolean isServerChange(final String path) {
       return serverNode.isServerPath(path);
   }
}
  1. 自诊断修复时也会触发,具体在自诊断修复章节

作业分片

// ShardingService.java
// 如果需要分片且当前节点为主节点, 则作业分片 如果当前无可用节点则不分片
public void shardingIfNecessary() {
   // 获取可用job实例
   List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
   // 判断是否需要重新分片
   if (!isNeedSharding() || availableJobInstances.isEmpty()) {
       return;
   }
   // 非主节点等待作业分片项分配完成 主节点则继续
   if (!leaderService.isLeaderUntilBlock()) { 
       blockUntilShardingCompleted();
       return;
   }

   // 主节点继续,但是等待其他运行中的作业运行完
   waitingOtherJobCompleted();
   // 从zk获取配置
   LiteJobConfiguration liteJobConfig = configService.load(false);
   int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
   // 重新分片标记设置
   jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
   // 重置作业分片项信息
   resetShardingInfo(shardingTotalCount);
   // zk事务执行多条语句
   JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
   jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
}
3. blockUntilShardingCompleted
4. waitingOtherJobCompleted
5. load
6. fillEphemeralJobNode
7. resetShardingInfo
private void resetShardingInfo(final int shardingTotalCount) {
  // 重置 有效的作业分片项
  for (int i = 0; i < shardingTotalCount; i++) {
      // 移除 `/${JOB_NAME}/sharding/${ITEM_ID}/instance`
      jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getInstanceNode(i)); 
      // 创建 `/${JOB_NAME}/sharding/${ITEM_ID}`
      jobNodeStorage.createJobNodeIfNeeded(ShardingNode.ROOT + "/" + i); 
  }
  // 移除 多余的作业分片项
  int actualShardingTotalCount = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT).size();
  if (actualShardingTotalCount > shardingTotalCount) {
      for (int i = shardingTotalCount; i < actualShardingTotalCount; i++) {
          // 移除 `/${JOB_NAME}/sharding/${ITEM_ID}`
          jobNodeStorage.removeJobNodeIfExisted(ShardingNode.ROOT + "/" + i); 
      }
  }
}
8. executeInTransaction zk提供的事务执行
9. execute执行
// PersistShardingInfoTransactionExecutionCallback.java
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
   
   // 作业分片项分配结果缓存
   private final Map<JobInstance, List<Integer>> shardingResults;
   
   @Override
   public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
       // 设置 每个节点分配的作业分片项 真正设置数据进去
       for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
           for (int shardingItem : entry.getValue()) {
               curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem))
                       , entry.getKey().getJobInstanceId().getBytes()).and();
           }
       }
       // 移除 作业需要重分片的标记、作业正在重分片的标记
       curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
       curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
   }
}

获取作业分片上下文集合

// LiteJobFacade.java
@Override
public ShardingContexts getShardingContexts() {
   // 失效转移部分,先跳过
   boolean isFailover = configService.load(true).isFailover();
   if (isFailover) {
       List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
       if (!failoverShardingItems.isEmpty()) {
           return executionContextService.getJobShardingContext(failoverShardingItems);
       }
   }
   // 作业分片, 如果需要分片且当前节点为主节点
   shardingService.shardingIfNecessary();
   // 获取分配在本机的作业分片项
   List<Integer> shardingItems = shardingService.getLocalShardingItems();
   // 失效转移部分,先跳过
   if (isFailover) {
       shardingItems.removeAll(failoverService.getLocalTakeOffItems());
   }
   // 移除被禁用的作业分片项
   shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
   // 获取当前作业服务器分片上下文
   return executionContextService.getJobShardingContext(shardingItems);
}
4. shardingIfNecessary
5. getLocalShardingItems
// ShardingService.java
// 获取运行在本作业实例的分片项集合
public List<Integer> getLocalShardingItems() {
   if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
       return Collections.emptyList();
   }
   return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}

// 获取作业运行实例的分片项集合.
public List<Integer> getShardingItems(final String jobInstanceId) {
   JobInstance jobInstance = new JobInstance(jobInstanceId);
   if (!serverService.isAvailableServer(jobInstance.getIp())) {
       return Collections.emptyList();
   }
   List<Integer> result = new LinkedList<>();
   int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
   for (int i = 0; i < shardingTotalCount; i++) {
       // `/${JOB_NAME}/sharding/${ITEM_ID}/instance` 如果是本机则添加
       if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
           result.add(i);
       }
   }
   return result;
}
7. shardingItems.removeAll
// ExecutionService.java
// 获取禁用的任务分片项
public List<Integer> getDisabledItems(final List<Integer> items) {
   List<Integer> result = new ArrayList<>(items.size());
   for (int each : items) {
       // /${JOB_NAME}/sharding/${ITEM_ID}/disabled
       if (jobNodeStorage.isJobNodeExisted(ShardingNode.getDisabledNode(each))) {
           result.add(each);
       }
   }
   return result;
}
getJobShardingContext获取分片上下文
// ExecutionContextService.java
public ShardingContexts getJobShardingContext(final List<Integer> shardingItems) {
   LiteJobConfiguration liteJobConfig = configService.load(false);
   // 移除 正在运行中的作业分片项,monitor开启才会记录运行中分片
   removeRunningIfMonitorExecution(liteJobConfig.isMonitorExecution(), shardingItems);
   if (shardingItems.isEmpty()) {
       return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 
               liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), Collections.<Integer, String>emptyMap());
   }
   // 解析分片参数
   Map<Integer, String> shardingItemParameterMap = new ShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
   // 创建 分片上下文集合
   return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), //
           liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
           liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(),
           getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap)); // 获得当前作业节点的分片参数
}

失效转移

作业节点崩溃监听

// JobCrashedJobListener.java
class JobCrashedJobListener extends AbstractJobListener {
   
   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       // 开启失效转移 并且${JOB_NAME}/instances/${INSTANCE_ID}节点被移除
       if (isFailoverEnabled() && Type.NODE_REMOVED == eventType
               && instanceNode.isInstancePath(path)) { 
           String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
           if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
               return;
           }
           // 先获取${JOB_NAME}/sharding/${ITEM_ID}/failover节点数据
           List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
           if (!failoverItems.isEmpty()) {
               for (int each : failoverItems) {
                   failoverService.setCrashedFailoverFlag(each);
                   failoverService.failoverIfNecessary();
               }
           } else {
                // 没有的话获取/${JOB_NAME}/sharding/${ITEM_ID}/instance节点数据
               for (int each : shardingService.getShardingItems(jobInstanceId)) { 
                   failoverService.setCrashedFailoverFlag(each);
                   failoverService.failoverIfNecessary();
               }
           }
       }
   }
}
public void setCrashedFailoverFlag(final int item) {
   if (!isFailoverAssigned(item)) {
       // /${JOB_NAME}/leader/failover/items/${ITEM_ID}
       jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item)); 
   }
}
    
private boolean isFailoverAssigned(final Integer item) {
   return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
}

作业失效转移

// FailoverService.java
public void failoverIfNecessary() {
   if (needFailover()) {
       jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
   }
}

private boolean needFailover() {
    // `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 有失效转移的作业分片项而且当前作业不在运行中
    return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
            && !JobRegistry.getInstance().isJobRunning(jobName);
}

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
   
   @Override
   public void execute() {
       // 判断需要失效转移
       if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
           return;
       }
       // 获得一个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
       int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
       // 设置这个 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作业分片项 为 当前作业节点
       jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
       // 移除这个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
       jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
       // 触发作业执行
       JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
       if (null != jobScheduleController) {
           jobScheduleController.triggerJob();
       }
   }
}
// AbstractElasticJobExecutor
public final void execute() {
   // 执行 普通触发的作业
   execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
   // 执行 被跳过触发的作业
   while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
       jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
   }
   
   // 执行 作业失效转移
   jobFacade.failoverIfNecessary();
}

获取作业分片上下文集合

// LiteJobFacade.java
@Override
public ShardingContexts getShardingContexts() {
   // 获得 失效转移的作业分片项
   boolean isFailover = configService.load(true).isFailover();
   if (isFailover) {
       // 2. 获取失效转移分片
       List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
       if (!failoverShardingItems.isEmpty()) {
           // 跳过因为在分片章节介绍过了
           return executionContextService.getJobShardingContext(failoverShardingItems);
       }
   }
   // 跳过因为在分片章节介绍过了
   shardingService.shardingIfNecessary();
   // 跳过因为在分片章节介绍过了
   List<Integer> shardingItems = shardingService.getLocalShardingItems();
   // 6. 移除失效转移分片
   if (isFailover) {
       shardingItems.removeAll(failoverService.getLocalTakeOffItems());
   }
   // 移除 被禁用的作业分片项
   shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
   // 跳过因为在分片章节介绍过了
   return executionContextService.getJobShardingContext(shardingItems);
}
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
// 获取运行在本作业服务器的被失效转移的序列号
public List<Integer> getLocalTakeOffItems() {
    List<Integer> shardingItems = shardingService.getLocalShardingItems();
    List<Integer> result = new ArrayList<>(shardingItems.size());
    for (int each : shardingItems) {
        if (jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(each))) {
            result.add(each);
        }
    }
    return result;
}

错过执行


参考文章

  1. 脑裂是什么?Zookeeper是如何解决的?
  2. Kafka研究系列之kafka 如何避免脑裂?如何选举leader
  3. 如何防止ElasticSearch集群出现脑裂现象
  4. elastic-job调度模型
  5. 芋道源码-elastic-job
  6. Quartz原理解密
  7. 分布式定时任务调度系统技术选型
上一篇 下一篇

猜你喜欢

热点阅读