分布式定时任务elastic-job(二)

2021-05-29  本文已影响0人  后来丶_a24d

目录

目录.png

分布式定时任务系列


执行

执行器的创建

AbstractElasticJobExecutor各个属性
AbstractElasticJobExecutor各个属性.png
构造函数
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
    this.jobFacade = jobFacade;
    // 加载作业配置
    jobRootConfig = jobFacade.loadJobRootConfiguration(true);
    jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
    // 获取线程池
    executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
    // 获取异常处理器
    jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
    // 分片错误信息集合设置
    itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {

    /**
     * simple job实现
     */
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
}
构造函数中获取线程池
// AbstractElasticJobExecutor
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
    // 省略部分代码
    // 获取线程池
    executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
}

// AbstractElasticJobExecutor.getHandler
private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
    // 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
    String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
    try {
        Class<?> handlerClass = Class.forName(handlerClassName);
        if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
            return handlerClass.newInstance();
        }
        return getDefaultHandler(jobPropertiesEnum, handlerClassName);
    } catch (final ReflectiveOperationException ex) {
        return getDefaultHandler(jobPropertiesEnum, handlerClassName);
    }
}

// ExecutorServiceHandlerRegistry
// Map<String, ExecutorService> REGISTRY = new HashMap<>()
public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) {
    // 缓存不存在则创建线程池,key是jobName
    if (!REGISTRY.containsKey(jobName)) 
        REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName));
    }
    return REGISTRY.get(jobName);
}

// ExecutorServiceHandler 是个接口, 支持自定义线程池
ExecutorService createExecutorService(final String jobName);
private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
                                                         int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
                                                         boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {

    //定义作业核心配置
    JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
            .newBuilder(jobClass.getName(), cron, shardingTotalCount)
            .misfire(true)
            .failover(failover)
            .jobParameter(jobParameter)
            .shardingItemParameters(shardingItemParameters)
            // 自定义异常处理器,也可以自定义线程池
            .jobProperties("job_exception_handler", "com.seeger.demo.config.MyJobExceptionHandler")
            .build();

    // 省略其他代码

    return liteJobConfiguration;
}
// DefaultExecutorServiceHandler
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler { 
    @Override
    public ExecutorService createExecutorService(final String jobName) {
        return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
    }
}

// ExecutorServiceObject
public ExecutorService createExecutorService() {
    // 使用guava提供的方法将线程池自定增加jvm退出钩子
    return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
}
构造函数中获取异常处理器
public final class DefaultJobExceptionHandler implements JobExceptionHandler {
    
    @Override
    public void handleException(final String jobName, final Throwable cause) {
        log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
    }
}

执行器的执行

1.1 检查作业环境
// AbstractElasticJobExecutor
public final void execute() {
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobExceptionHandler.handleException(jobName, cause);
    }
    // 省略部分代码
}

// LiteJobFacade
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
    configService.checkMaxTimeDiffSecondsTolerable();
}
// ConfigService
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
    // 检查本机与注册中心的时间误差秒数是否在允许范围, 可通过插入测试数据获取zk的更新时间判断zk那边的系统时间
    int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
    if (-1  == maxTimeDiffSeconds) {
        return;
    }
    long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
    if (timeDiff > maxTimeDiffSeconds * 1000L) {
        throw new JobExecutionEnvironmentException(
                "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
    }
}
1.2 获取当前作业上下文
1.3,1.5 发布作业状态追踪事件
1.4 跳过正在运行中的被错过执行的作业
// LiteJobFacade.java
@Override
public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
   return executionService.misfireIfHasRunningItems(shardingItems);
}

// ExecutionService.java
public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
   if (!hasRunningItems(items)) {
       return false;
   }
   setMisfire(items);
   return true;
}

public boolean hasRunningItems(final Collection<Integer> items) {
   LiteJobConfiguration jobConfig = configService.load(true);
   if (null == jobConfig || !jobConfig.isMonitorExecution()) {
       return false;
   }
   for (int each : items) {
       if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) {
           return true;
       }
   }
   return false;
}
1.6 作业执行前的方法
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
    for (ElasticJobListener each : elasticJobListeners) {
        each.beforeJobExecuted(shardingContexts);
    }
}
执行正常任务
// 错过执行和正常任务执行还有失效转移执行都是调用此方法,ExecutionSource 不同而已
// AbstractElasticJobExecutor.java
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
   // 分片数据为空,则发布事件
   if (shardingContexts.getShardingItemParameters().isEmpty()) {
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
       }
       return;
   }
   // 注册作业启动信息
   jobFacade.registerJobBegin(shardingContexts);
   // 发布作业状态追踪事件
   String taskId = shardingContexts.getTaskId();
   if (shardingContexts.isAllowSendJobEvent()) {
       jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
   }
   try {
       // 核心处理
       process(shardingContexts, executionSource);
   } finally {
       // 注册作业完成信息
       jobFacade.registerJobCompleted(shardingContexts);
       // 根据是否有异常,发布作业状态追踪事件
       if (itemErrorMessages.isEmpty()) {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
           }
       } else {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
           }
       }
   }
}
// LiteJobFacade
public void registerJobBegin(final ShardingContexts shardingContexts) {
   executionService.registerJobBegin(shardingContexts);
}

// ExecutionService
public void registerJobBegin(final ShardingContexts shardingContexts) {
    // 告诉JobRegistry当前jobName下的任务正在执行
    JobRegistry.getInstance().setJobRunning(jobName, true);
    if (!configService.load(true).isMonitorExecution()) {
        return;
    }
    // 设置监控作业运行时状态, 才记录运行状态
    for (int each : shardingContexts.getShardingItemParameters().keySet()) {
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
    }
}
// AbstractElasticJobExecutor
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    // 如果是单个分片则直接执行
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
        //执行单个任务
        process(shardingContexts, item, jobExecutionEvent);
        return;
    }
    // 多个分片,则多线程执行
    final CountDownLatch latch = new CountDownLatch(items.size());
    for (final int each : items) {
        final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(new Runnable() {
            
            @Override
            public void run() {
                try {
                    //执行单个任务
                    process(shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}
// AbstractElasticJobExecutor
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
    // 发布单个事件开始
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobExecutionEvent(startEvent);
    }
    log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
    JobExecutionEvent completeEvent;
    try {
        // 子类实现,抽象方法
        process(new ShardingContext(shardingContexts, item));
        completeEvent = startEvent.executionSuccess();
        log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(completeEvent);
        }
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //发布单个事件结束
        completeEvent = startEvent.executionFailure(cause);
        jobFacade.postJobExecutionEvent(completeEvent);
        itemErrorMessages.put(item, ExceptionUtil.transform(cause));
        jobExceptionHandler.handleException(jobName, cause);
    }
}
// 拿SimpleJobExecutor举例
protected void process(final ShardingContext shardingContext) {
    // 调用自己实现的job类 
    simpleJob.execute(shardingContext);
}
执行错过转移任务
// JobScheduler.java
private Scheduler createScheduler() {
   Scheduler result;
   // 省略部分代码
   result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
   return result;
}

private Properties getBaseQuartzProperties() {
   // 省略部分代码
   // 设置最大允许超过 1 毫秒,作业分片项即被视为错过执行
   result.put("org.quartz.jobStore.misfireThreshold", "1");
   return result;
}

// JobScheduleController.class
private CronTrigger createTrigger(final String cron) {
   return TriggerBuilder.newTrigger()
           .withIdentity(triggerIdentity)
           .withSchedule(CronScheduleBuilder.cronSchedule(cron)
            // quartz遇到错过执行时不进行处理交给elastic-job
           .withMisfireHandlingInstructionDoNothing())
           .build();
}
 
// TriggerListener 监听处理
// JobTriggerListener.java
public final class JobTriggerListener extends TriggerListenerSupport {
    
    @Override
    public void triggerMisfired(final Trigger trigger) {
        if (null != trigger.getPreviousFireTime()) {
            executionService.setMisfire(shardingService.getLocalShardingItems());
        }
    }
}

// ExecutionService设置错过执行标记
public void setMisfire(final Collection<Integer> items) {
   for (int each : items) {
       jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
   }
}
public final void execute() {
   // .... 省略部分代码
   // 判断是否执行错误执行任务, 从时序图1.7开始
   while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
       // 清除错过执行标识
       jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
       // 执行错过任务,跟前面执行正常任务逻辑一样
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
   }
}
执行失效转移1.11
作业执行后方法1.12
// LiteJobFacade
public void afterJobExecuted(final ShardingContexts shardingContexts) {
    for (ElasticJobListener each : elasticJobListeners) {
        each.afterJobExecuted(shardingContexts);
    }
}

注册中心

JobNodeStorage

获取注册中心当前时间
//ZookeeperRegistryCenter
public long getRegistryCenterTime(final String key) {
   long result = 0L;
   try {
       persist(key, "");
       result = client.checkExists().forPath(key).getMtime();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   Preconditions.checkState(0L != result, "Cannot get registry center time.");
   return result;
}

在主节点执行操作,分布式锁

// JobeNodeStorage 分布式锁,并提供回调机制
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        // zk节点创建LeaderLatch getClinet依赖ZookeeperRegistryCenter
    try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
        latch.start();
        latch.await();
        callback.execute();
    } catch (final Exception ex) {
        handleException(ex);
    }
}

在事务中执行操作

// JobeNodeStorage
public void executeInTransaction(final TransactionExecutionCallback callback) {
    try {
        CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
        callback.execute(curatorTransactionFinal);
        curatorTransactionFinal.commit();
    } catch (final Exception ex) {
        RegExceptionHandler.handleException(ex);
    }
}

监听器

注册中心监听器

public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
    if (JobRegistry.getInstance().isShutdown(jobName)) {
        return;
    }
    JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
    // 连接中断 或者连接丢失
    if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
        // 则暂停job
        jobScheduleController.pauseJob();
    } else if (ConnectionState.RECONNECTED == newState) {
        // 持久化作业服务器上线信息
        serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
        // 持久化作业运行实例上线相关信息
        instanceService.persistOnline();
        // 清除本地分配的作业分片项运行中的标记
        executionService.clearRunningInfo(shardingService.getLocalShardingItems());
        // 恢复作业调度
        jobScheduleController.resumeJob();
    }
}

// JobScheduleController
public synchronized void pauseJob() {
    try {
        if (!scheduler.isShutdown()) {
            scheduler.pauseAll();
        }
    } catch (final SchedulerException ex) {
        throw new JobSystemException(ex);
    }
}

public synchronized void resumeJob() {
    try {
        if (!scheduler.isShutdown()) {
            scheduler.resumeAll();
        }
    } catch (final SchedulerException ex) {
        throw new JobSystemException(ex);
    }
}

分片监听

// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // 开启所有监听器
   listenerManager.startAllListeners();
   // 省略部分方法
}
// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // 开启 所有监听器
   listenerManager.startAllListeners();
   // 省略部分方法
}
// 开启 所有监听器
public void startAllListeners() {
    electionListenerManager.start();
    shardingListenerManager.start();
    failoverListenerManager.start();
    monitorExecutionListenerManager.start();
    shutdownListenerManager.start();
    triggerListenerManager.start();
    rescheduleListenerManager.start();
    guaranteeListenerManager.start();

// AbstractListenerManager
//开启监听器
public abstract void start();
// 添加注册中心监听器到注册中心TreeCache
protected void addDataListener(final TreeCacheListener listener) {
    jobNodeStorage.addDataListener(listener);
    jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
public void start() {
    addDataListener(new ShardingTotalCountChangedJobListener());
    addDataListener(new ListenServersChangedJobListener());
}
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
    // 忽略掉非数据变化的事件
    ChildData childData = event.getData();
    if (null == childData) {
        return;
    }
    String path = childData.getPath();
    if (path.isEmpty()) {
        return;
    }
    dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}
// 节点变化交给子类实现
protected abstract void dataChanged(final String path, final Type eventType, final String data);
// ShardingListenerManager
class ListenServersChangedJobListener extends AbstractJobListener {
    
    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
        if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
            shardingService.setReshardingFlag();
        }
    }
    
    private boolean isInstanceChange(final Type eventType, final String path) {
        return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
    }
    
    private boolean isServerChange(final String path) {
        return serverNode.isServerPath(path);
    }
}

作业监听器

public interface ElasticJobListener {
    // 作业执行前的执行的方法.
    void beforeJobExecuted(final ShardingContexts shardingContexts);
    // 作业执行后的执行的方法.
    void afterJobExecuted(final ShardingContexts shardingContexts);
}
// AbstractElasticJobExecutor.java
public final void execute() {
   // 省略代码
   
   // 执行 作业执行前的方法
   try {
       jobFacade.beforeJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
   // 省略代码
   // 执行 作业执行后的方法
   try {
       jobFacade.afterJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(// 省略) {

    return new SpringJobScheduler(simpleJob,
            registryCenter,
            getLiteJobConfiguration(simpleJob.getClass(),
                    cron,
                    shardingTotalCount,
                    shardingItemParameters,
                    jobParameter,
                    failover,
                    monitorExecution,
                    monitorPort,
                    maxTimeDiffSeconds,
                    jobShardingStrategyClass),
            jobEventConfiguration,
            // 自定义作业监听器
            new SimpleJobListener());

}
public class SimpleJobListener implements ElasticJobListener {
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("任务开始了");
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("任务结束了");
    }
}

参考文章

  1. 脑裂是什么?Zookeeper是如何解决的?
  2. Kafka研究系列之kafka 如何避免脑裂?如何选举leader
  3. 如何防止ElasticSearch集群出现脑裂现象
  4. elastic-job调度模型
  5. 芋道源码-elastic-job
  6. Quartz原理解密
  7. 分布式定时任务调度系统技术选型
上一篇下一篇

猜你喜欢

热点阅读