定时器(elastic-job 二)
2020-06-17 本文已影响0人
寂静的春天1988
分片策略
平均分片策略:举例10个分片,3台服务。第一个台服务分片0,1,2,9。第二台服务分片3,4,5。第三台服务6,7,8。
作业名的哈希值奇偶数决定IP升降序算法的分片策略:
举例10个分片,3台服务。作业名哈希值为奇数就和平均分配一致。如果是偶数,第三台服务0,1,2,9。第二台服务分片3,4,5。第一个台服务分片6,7,8。
作业名哈希值对服务器列表进行轮转:作业名对服务器总数取余,余数作为起点。
举例10个分片,3台服务。如果hash值对3取余,如果余0,那么和平均分配一样,如果余1那么第二台服务器就是起点,第二台服务0,1,2,9。第三台服务分片3,4,5。第一台服务分片6,7,8。
个人总结:说到底这三种策略还是平均分配策略,不过是起点的服务器不一样而已。
自定义分片策略
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {
String jobName() default "";
String cron() default "";
int shardingTotalCount() default 1;
boolean overwrite() default false;
Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
}
public class MyShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName,
int shardingTotalCount) {
Map<JobInstance, List<Integer>> map=new HashMap<JobInstance, List<Integer>>();
ArrayDeque<Integer> queue=new ArrayDeque<Integer>();
for (int i = 0; i < shardingTotalCount; i++) {
queue.add(i);
}
while(queue.size()>0) {
for (JobInstance jobInstance : jobInstances) {
if(queue.size()>0) {
List<Integer> shardingItems=map.get(jobInstance);
if(shardingItems!=null&&shardingItems.size()>0) {
shardingItems.add(queue.pop());
}else {
List<Integer> newShardingItems = new ArrayList<Integer>();
newShardingItems.add(queue.pop());
map.put(jobInstance, newShardingItems);
}
}
}
}
return map;
}
}
@Configuration
// 如果配置了zookeeper注册中心
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private CoordinatorRegistryCenter zkCenter;
@PostConstruct
public void initSimpleJob() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
for (Map.Entry<String, Object> entry : beans.entrySet()) {
Object instance = entry.getValue();
Class<?>[] instances = instance.getClass().getInterfaces();
for (Class<?> superInstance : instances) {
// 如果实现了SimpleJob接口
if (superInstance == SimpleJob.class) {
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
Class<?> jobStrategy = annotation.jobStrategy();
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).build();
// job类型配置
JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
// 加入到JobScheduler
new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
}
}
}
}
}
@ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
,shardingTotalCount = 10,overwrite = true,
jobStrategy = MyShardingStrategy.class)
public class MySimpleJob implements SimpleJob{
@Autowired
private TestService testService;
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("当前分片项==》"+shardingContext.getShardingItem());
}
}
事件追踪
1、配置好datasource(略)
2、修改配置,开启事件追踪
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private CoordinatorRegistryCenter zkCenter;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initSimpleJob() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
for (Map.Entry<String, Object> entry : beans.entrySet()) {
Object instance = entry.getValue();
Class<?>[] instances = instance.getClass().getInterfaces();
for (Class<?> superInstance : instances) {
// 如果实现了SimpleJob接口
if (superInstance == SimpleJob.class) {
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
Class<?> jobStrategy = annotation.jobStrategy();
boolean jobEvent=annotation.jobEevent();
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).build();
// job类型配置
JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
JobEventConfiguration jec=new JobEventRdbConfiguration(dataSource);
if(jobEvent) {
// 加入到JobScheduler
new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,jec).init();
}else {
// 加入到JobScheduler
new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
}
}
}
}
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {
String jobName() default "";
String cron() default "";
int shardingTotalCount() default 1;
boolean overwrite() default false;
Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
boolean jobEevent() default false;
}
@ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
,shardingTotalCount = 10,overwrite = true,
jobStrategy = MyShardingStrategy.class
,jobEevent = true)
public class MySimpleJob implements SimpleJob{
@Autowired
private TestService testService;
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("当前分片项==》"+shardingContext.getShardingItem());
}
}
注意开启事件追踪后,会自动增加两张表job_execution_log和job_status_trace_log。记录定时器相关内容。
作业监听器
监听作业执行前和作业执行后
作业监听器类型:
1、每个作业节点都均执行,无需考虑分布式(推荐)
2、仅单个作业节点执行
类型一:
public class MyNormalListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println(shardingContexts.getJobName()+",方法前!");
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println(shardingContexts.getJobName()+",方法后!");
}
}
@Configuration
// 如果配置了zookeeper注册中心
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private CoordinatorRegistryCenter zkCenter;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initSimpleJob() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
for (Map.Entry<String, Object> entry : beans.entrySet()) {
Object instance = entry.getValue();
Class<?>[] instances = instance.getClass().getInterfaces();
for (Class<?> superInstance : instances) {
// 如果实现了SimpleJob接口
if (superInstance == SimpleJob.class) {
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
Class<?> jobStrategy = annotation.jobStrategy();
boolean jobEvent=annotation.jobEevent();
Class<? extends ElasticJobListener>[] jobListners=annotation.jobListner();
ElasticJobListener[] listnerInstances=new ElasticJobListener[jobListners.length];
for (int i = 0; i < jobListners.length; i++) {
Class<? extends ElasticJobListener> listner =jobListners[i];
ElasticJobListener elasticJobListener =listner.getDeclaredConstructor().newInstance();
listnerInstances[i]=elasticJobListener;
}
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).build();
// job类型配置
JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
JobEventConfiguration jec=new JobEventRdbConfiguration(dataSource);
if(jobEvent) {
// 加入到JobScheduler
new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,jec,listnerInstances).init();
}else {
// 加入到JobScheduler
MyNormalListener listener=new MyNormalListener();
new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,listnerInstances).init();
}
}
@ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
,shardingTotalCount = 10,overwrite = true,
jobStrategy = MyShardingStrategy.class
,jobEevent = true,
jobListner = MyNormalListener.class)
public class MySimpleJob implements SimpleJob{
@Autowired
private TestService testService;
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("当前分片项==》"+shardingContext.getShardingItem());
}
}
注意作业监听器和分片无关,多个分片也只会监听一次,只和作业节点和任务有关。
运维平台
略(有时间补)