分布式定时任务elastic-job(二)
2021-05-29 本文已影响0人
后来丶_a24d
目录
目录.png分布式定时任务系列
执行
执行器的创建
AbstractElasticJobExecutor各个属性
AbstractElasticJobExecutor各个属性.png- JobFacade: 作业门面对象
- JobRootConfiguration: 作业配置
- ExecutorService: 执行线程池
- JobExceptionHandler: 异常处理器
构造函数
- 构造函数调用
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
// 加载作业配置
jobRootConfig = jobFacade.loadJobRootConfiguration(true);
jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
// 获取线程池
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
// 获取异常处理器
jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
// 分片错误信息集合设置
itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
- 拿simple job举例是通过SimpleJobExecutor初始化父类AbstractElasticJobExecutor
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
/**
* simple job实现
*/
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
}
构造函数中获取线程池
- 当某台服务承担多个分片时,需要多线程执行加快速度,再加上任务隔离性,一个任务对应一个线程池。
// AbstractElasticJobExecutor
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
// 省略部分代码
// 获取线程池
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
}
// AbstractElasticJobExecutor.getHandler
private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
// 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
try {
Class<?> handlerClass = Class.forName(handlerClassName);
if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
return handlerClass.newInstance();
}
return getDefaultHandler(jobPropertiesEnum, handlerClassName);
} catch (final ReflectiveOperationException ex) {
return getDefaultHandler(jobPropertiesEnum, handlerClassName);
}
}
// ExecutorServiceHandlerRegistry
// Map<String, ExecutorService> REGISTRY = new HashMap<>()
public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) {
// 缓存不存在则创建线程池,key是jobName
if (!REGISTRY.containsKey(jobName))
REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName));
}
return REGISTRY.get(jobName);
}
// ExecutorServiceHandler 是个接口, 支持自定义线程池
ExecutorService createExecutorService(final String jobName);
- 构造函数中getHandler是实现可以自定义线程池, 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
- 自定义线程池和自定义异常处理器逻辑都是一样的
private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {
//定义作业核心配置
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.misfire(true)
.failover(failover)
.jobParameter(jobParameter)
.shardingItemParameters(shardingItemParameters)
// 自定义异常处理器,也可以自定义线程池
.jobProperties("job_exception_handler", "com.seeger.demo.config.MyJobExceptionHandler")
.build();
// 省略其他代码
return liteJobConfiguration;
}
- 如果没有自定义线程池则使用DefaultExecutorServiceHandler创建线程池, DefaultExecutorServiceHandler创建的线程池使用了guava提供的方法可以再jvm退出时增加钩子等120秒再优雅退出,kill -15类似这种。这种方法会把线程池的线程设置成守护线程,如果遇到需要发mq, 操作数据库,不太适合,因为这jvm关闭spring也关了,发mq也会需要spring(因为一般使用mq都是结合spring来的),这会导致发送失败。
// DefaultExecutorServiceHandler
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {
@Override
public ExecutorService createExecutorService(final String jobName) {
return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
}
}
// ExecutorServiceObject
public ExecutorService createExecutorService() {
// 使用guava提供的方法将线程池自定增加jvm退出钩子
return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
}
构造函数中获取异常处理器
- 实现思路跟获取线程池很像,也是可以自定义的SPI接口,也有默认实现
public final class DefaultJobExceptionHandler implements JobExceptionHandler {
@Override
public void handleException(final String jobName, final Throwable cause) {
log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
}
}
执行器的执行
-
时序图
时序图.png
1.1 检查作业环境
- 源码
// AbstractElasticJobExecutor
public final void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// 省略部分代码
}
// LiteJobFacade
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
configService.checkMaxTimeDiffSecondsTolerable();
}
// ConfigService
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
// 检查本机与注册中心的时间误差秒数是否在允许范围, 可通过插入测试数据获取zk的更新时间判断zk那边的系统时间
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
1.2 获取当前作业上下文
- 在分片章节分析
1.3,1.5 发布作业状态追踪事件
- 在事件追踪章节分析
1.4 跳过正在运行中的被错过执行的作业
- 这里是防御性编程,防止正在执行的但是属于错过执行的任务重复执行。因为错过任务是在1.9清除错过标记之后(如果1.8判断需要执行错过方法),调用executor方法执行错过任务(1.10),1.7会有executor方法调用这个是执行正常任务。这也就是说,错过任务是在正常任务执行完,马上执行,而不像失效转移任务不是在正常任务执行完,下次任务执行失效转移任务。
- 分配的作业分片项里存在任意一个分片正在运行中,则设置分片项被错过执行,不去执行这些任务,因为这时候如果不跳过,可能导致同时运行某个作业分片。依赖monitorExecution为true记录下有分片在执行。
// LiteJobFacade.java
@Override
public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
return executionService.misfireIfHasRunningItems(shardingItems);
}
// ExecutionService.java
public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
if (!hasRunningItems(items)) {
return false;
}
setMisfire(items);
return true;
}
public boolean hasRunningItems(final Collection<Integer> items) {
LiteJobConfiguration jobConfig = configService.load(true);
if (null == jobConfig || !jobConfig.isMonitorExecution()) {
return false;
}
for (int each : items) {
if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) {
return true;
}
}
return false;
}
1.6 作业执行前的方法
- 监听器章节分析, 源码
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.beforeJobExecuted(shardingContexts);
}
}
执行正常任务
- 1.7执行正常任务executor方法
// 错过执行和正常任务执行还有失效转移执行都是调用此方法,ExecutionSource 不同而已
// AbstractElasticJobExecutor.java
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
// 分片数据为空,则发布事件
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
// 注册作业启动信息
jobFacade.registerJobBegin(shardingContexts);
// 发布作业状态追踪事件
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
// 核心处理
process(shardingContexts, executionSource);
} finally {
// 注册作业完成信息
jobFacade.registerJobCompleted(shardingContexts);
// 根据是否有异常,发布作业状态追踪事件
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
- 注册作业启动信息
// LiteJobFacade
public void registerJobBegin(final ShardingContexts shardingContexts) {
executionService.registerJobBegin(shardingContexts);
}
// ExecutionService
public void registerJobBegin(final ShardingContexts shardingContexts) {
// 告诉JobRegistry当前jobName下的任务正在执行
JobRegistry.getInstance().setJobRunning(jobName, true);
if (!configService.load(true).isMonitorExecution()) {
return;
}
// 设置监控作业运行时状态, 才记录运行状态
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
}
}
- 注册作业完成信息, 思路跟注册作业启动信息相反,就是移除正在执行的标记,如果开启了失效转移了,调用FailoverService.updateFailoverComplete(), 这个在失效转移章节分析
- 核心process方法, 但分片则单线程执行,多个分片则多线程执行
// AbstractElasticJobExecutor
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
// 如果是单个分片则直接执行
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
//执行单个任务
process(shardingContexts, item, jobExecutionEvent);
return;
}
// 多个分片,则多线程执行
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
//执行单个任务
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// AbstractElasticJobExecutor
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
// 发布单个事件开始
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
// 子类实现,抽象方法
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
//发布单个事件结束
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
// 拿SimpleJobExecutor举例
protected void process(final ShardingContext shardingContext) {
// 调用自己实现的job类
simpleJob.execute(shardingContext);
}
执行错过转移任务
- 1.10执行错过任务executor方法, 如果1.8判断需要错过转移
- 标记错过执行
// JobScheduler.java
private Scheduler createScheduler() {
Scheduler result;
// 省略部分代码
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
return result;
}
private Properties getBaseQuartzProperties() {
// 省略部分代码
// 设置最大允许超过 1 毫秒,作业分片项即被视为错过执行
result.put("org.quartz.jobStore.misfireThreshold", "1");
return result;
}
// JobScheduleController.class
private CronTrigger createTrigger(final String cron) {
return TriggerBuilder.newTrigger()
.withIdentity(triggerIdentity)
.withSchedule(CronScheduleBuilder.cronSchedule(cron)
// quartz遇到错过执行时不进行处理交给elastic-job
.withMisfireHandlingInstructionDoNothing())
.build();
}
// TriggerListener 监听处理
// JobTriggerListener.java
public final class JobTriggerListener extends TriggerListenerSupport {
@Override
public void triggerMisfired(final Trigger trigger) {
if (null != trigger.getPreviousFireTime()) {
executionService.setMisfire(shardingService.getLocalShardingItems());
}
}
}
// ExecutionService设置错过执行标记
public void setMisfire(final Collection<Integer> items) {
for (int each : items) {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
}
}
- 执行,判断是用内存数据判断,更新数据是zk异步更新,由于网络原因可能,内存数据没更新,是防御编程,保证内存缓存的数据已经更新,这时候业务方需要保证幂等,因为可能重复调用。
public final void execute() {
// .... 省略部分代码
// 判断是否执行错误执行任务, 从时序图1.7开始
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
// 清除错过执行标识
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
// 执行错过任务,跟前面执行正常任务逻辑一样
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
}
执行失效转移1.11
- 失效转移章节分析
作业执行后方法1.12
- 监听器章节分析, 源码
// LiteJobFacade
public void afterJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.afterJobExecuted(shardingContexts);
}
}
注册中心
JobNodeStorage
- JobNodeStorage: 作业节点数据访问类
-
zk注册中心类图, RegistryCenter接口定义了简单的增删改查注册数据和查询时间的接口,CoordinatorRegistryCenter协调分布式服务注册中心,体用持久节点、临时节点、持久顺序节点、临时顺序节点等,ZookeeperRegistryCenter是zk实现的注册中心
image.png -
JobeNodeStorage依赖RegistryCenter,各个service是依赖JobeNodeStorage而不直接依赖RegistryCenter
JobeNodeStorage.png - ZookeeperRegistryCenter主要是用curator框架操作zk,有兴趣可以深入一下
- 几个比较有意思的方法举例说明下:
获取注册中心当前时间
- 更新一下数据,然后根据更新数据返回的更新时间,获得注册中心时间。
//ZookeeperRegistryCenter
public long getRegistryCenterTime(final String key) {
long result = 0L;
try {
persist(key, "");
result = client.checkExists().forPath(key).getMtime();
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
Preconditions.checkState(0L != result, "Cannot get registry center time.");
return result;
}
在主节点执行操作,分布式锁
- Curator用zk实现了两种分布式锁,LeaderLatch为其中一种,start后await等待拿到锁,多线程调用时其他线程会等待,当前线程释放锁时,其他线程会获取锁,try with resource结束后会调用close方法,LeaderLatch 实现了closable接口,具体使用在选主章节介绍。还有一种分布式锁有兴趣的可以看看
// JobeNodeStorage 分布式锁,并提供回调机制
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
// zk节点创建LeaderLatch getClinet依赖ZookeeperRegistryCenter
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
在事务中执行操作
- 开启事务,并执行TransactionExecutionCallback 回调
// JobeNodeStorage
public void executeInTransaction(final TransactionExecutionCallback callback) {
try {
CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
callback.execute(curatorTransactionFinal);
curatorTransactionFinal.commit();
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
}
监听器
-
ListenerManager管理多个监听器
ListenerManager.png - field中各个manage就是实现各个监听功能
注册中心监听器
-
ListenerManager中RegistryCenterConnectionStateListener,实现 Curator ConnectionStateListener 接口,监听注册中心连接状态
RegistryCenterConnectionStateListener.png - 监听状态变更
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return;
}
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
// 连接中断 或者连接丢失
if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
// 则暂停job
jobScheduleController.pauseJob();
} else if (ConnectionState.RECONNECTED == newState) {
// 持久化作业服务器上线信息
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
// 持久化作业运行实例上线相关信息
instanceService.persistOnline();
// 清除本地分配的作业分片项运行中的标记
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
// 恢复作业调度
jobScheduleController.resumeJob();
}
}
// JobScheduleController
public synchronized void pauseJob() {
try {
if (!scheduler.isShutdown()) {
scheduler.pauseAll();
}
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
public synchronized void resumeJob() {
try {
if (!scheduler.isShutdown()) {
scheduler.resumeAll();
}
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
分片监听
- ListenerManager中ShardingListenerManager是分片监听管理
- 在作业初始化时会开启所有监听器,
// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
// 开启所有监听器
listenerManager.startAllListeners();
// 省略部分方法
}
// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
// 开启 所有监听器
listenerManager.startAllListeners();
// 省略部分方法
}
// 开启 所有监听器
public void startAllListeners() {
electionListenerManager.start();
shardingListenerManager.start();
failoverListenerManager.start();
monitorExecutionListenerManager.start();
shutdownListenerManager.start();
triggerListenerManager.start();
rescheduleListenerManager.start();
guaranteeListenerManager.start();
-
调父类的方法AbstractListenerManager中对应方法
AbstractListenerManager.png
// AbstractListenerManager
//开启监听器
public abstract void start();
// 添加注册中心监听器到注册中心TreeCache
protected void addDataListener(final TreeCacheListener listener) {
jobNodeStorage.addDataListener(listener);
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
- ShardingListenerManager中start方法
public void start() {
addDataListener(new ShardingTotalCountChangedJobListener());
addDataListener(new ListenServersChangedJobListener());
}
- AbstractJobListener作业注册中心的监听器抽象类实现了curator中TreeCacheListener
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
// 忽略掉非数据变化的事件
ChildData childData = event.getData();
if (null == childData) {
return;
}
String path = childData.getPath();
if (path.isEmpty()) {
return;
}
dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}
// 节点变化交给子类实现
protected abstract void dataChanged(final String path, final Type eventType, final String data);
- 拿ShardingListenerManager举例,监听到变化时设置重新分片标识
// ShardingListenerManager
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
作业监听器
- ElasticJobListener作业监听器
public interface ElasticJobListener {
// 作业执行前的执行的方法.
void beforeJobExecuted(final ShardingContexts shardingContexts);
// 作业执行后的执行的方法.
void afterJobExecuted(final ShardingContexts shardingContexts);
}
- 在没传执行任务时会调用
// AbstractElasticJobExecutor.java
public final void execute() {
// 省略代码
// 执行 作业执行前的方法
try {
jobFacade.beforeJobExecuted(shardingContexts);
} catch (final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// 省略代码
// 执行 作业执行后的方法
try {
jobFacade.afterJobExecuted(shardingContexts);
} catch (final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
}
- 自定义实现可以在配置中设置
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(// 省略) {
return new SpringJobScheduler(simpleJob,
registryCenter,
getLiteJobConfiguration(simpleJob.getClass(),
cron,
shardingTotalCount,
shardingItemParameters,
jobParameter,
failover,
monitorExecution,
monitorPort,
maxTimeDiffSeconds,
jobShardingStrategyClass),
jobEventConfiguration,
// 自定义作业监听器
new SimpleJobListener());
}
public class SimpleJobListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println("任务开始了");
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println("任务结束了");
}
}