定时器(elastic-job 二)

2020-06-17  本文已影响0人  寂静的春天1988

分片策略

平均分片策略:举例10个分片,3台服务。第一个台服务分片0,1,2,9。第二台服务分片3,4,5。第三台服务6,7,8。

作业名的哈希值奇偶数决定IP升降序算法的分片策略:
举例10个分片,3台服务。作业名哈希值为奇数就和平均分配一致。如果是偶数,第三台服务0,1,2,9。第二台服务分片3,4,5。第一个台服务分片6,7,8。

作业名哈希值对服务器列表进行轮转:作业名对服务器总数取余,余数作为起点。
举例10个分片,3台服务。如果hash值对3取余,如果余0,那么和平均分配一样,如果余1那么第二台服务器就是起点,第二台服务0,1,2,9。第三台服务分片3,4,5。第一台服务分片6,7,8。

个人总结:说到底这三种策略还是平均分配策略,不过是起点的服务器不一样而已。

自定义分片策略

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {

    String jobName() default "";

    String cron() default "";

    int shardingTotalCount() default 1;

    boolean overwrite() default false;
    
    Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;

}
public class MyShardingStrategy implements JobShardingStrategy {

    @Override
    public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName,
            int shardingTotalCount) {
        Map<JobInstance, List<Integer>> map=new HashMap<JobInstance, List<Integer>>();
        
        ArrayDeque<Integer> queue=new ArrayDeque<Integer>();
        for (int i = 0; i < shardingTotalCount; i++) {
            queue.add(i);
        }
        while(queue.size()>0) {
            for (JobInstance jobInstance : jobInstances) {
                if(queue.size()>0) {
                    List<Integer> shardingItems=map.get(jobInstance);
                    if(shardingItems!=null&&shardingItems.size()>0) {
                        shardingItems.add(queue.pop());
                    }else {
                        List<Integer> newShardingItems = new ArrayList<Integer>();
                        newShardingItems.add(queue.pop());
                        map.put(jobInstance, newShardingItems);
                    }
                    
                }
            }
        }
        
        return map;
    }

}
@Configuration
// 如果配置了zookeeper注册中心
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private CoordinatorRegistryCenter zkCenter;
    
    @PostConstruct
    public void initSimpleJob() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for (Map.Entry<String, Object> entry : beans.entrySet()) {
            Object instance = entry.getValue();
            Class<?>[] instances = instance.getClass().getInterfaces();
            for (Class<?> superInstance : instances) {
                // 如果实现了SimpleJob接口
                if (superInstance == SimpleJob.class) {
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite = annotation.overwrite();
                    Class<?> jobStrategy = annotation.jobStrategy();
                    // job核心参数
                    JobCoreConfiguration jcc=JobCoreConfiguration
                            .newBuilder(jobName, cron, shardingTotalCount).build();
                    // job类型配置
                    JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                    // job根配置
                    LiteJobConfiguration ljc= LiteJobConfiguration
                            .newBuilder(jtc)
                            .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                            .overwrite(overwrite)
                            .build();
                    // 加入到JobScheduler
                    new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
                }
            }

        }
    }
}
@ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
                ,shardingTotalCount = 10,overwrite = true,
                jobStrategy = MyShardingStrategy.class)
public class MySimpleJob implements SimpleJob{
    
    @Autowired
    private TestService testService;
    
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("当前分片项==》"+shardingContext.getShardingItem());
        
    }

}

事件追踪

1、配置好datasource(略)
2、修改配置,开启事件追踪

@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private CoordinatorRegistryCenter zkCenter;
    
    @Autowired
    private DataSource dataSource;
    
    @PostConstruct
    public void initSimpleJob() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for (Map.Entry<String, Object> entry : beans.entrySet()) {
            Object instance = entry.getValue();
            Class<?>[] instances = instance.getClass().getInterfaces();
            for (Class<?> superInstance : instances) {
                // 如果实现了SimpleJob接口
                if (superInstance == SimpleJob.class) {
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite = annotation.overwrite();
                    Class<?> jobStrategy = annotation.jobStrategy();
                    boolean jobEvent=annotation.jobEevent();
                    
                    // job核心参数
                    JobCoreConfiguration jcc=JobCoreConfiguration
                            .newBuilder(jobName, cron, shardingTotalCount).build();
                    // job类型配置
                    JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                    // job根配置
                    LiteJobConfiguration ljc= LiteJobConfiguration
                            .newBuilder(jtc)
                            .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                            .overwrite(overwrite)
                            .build();
                    
                    JobEventConfiguration jec=new JobEventRdbConfiguration(dataSource);
                    if(jobEvent) {
                        // 加入到JobScheduler
                        new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,jec).init();
                    }else {
                        // 加入到JobScheduler
                        new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
                    }
                    
                }
            }

        }
    }
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {

    String jobName() default "";

    String cron() default "";

    int shardingTotalCount() default 1;

    boolean overwrite() default false;
    
    Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
    
    boolean jobEevent() default false;
}
@ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
                ,shardingTotalCount = 10,overwrite = true,
                jobStrategy = MyShardingStrategy.class
                ,jobEevent = true)
public class MySimpleJob implements SimpleJob{
    
    @Autowired
    private TestService testService;
    
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("当前分片项==》"+shardingContext.getShardingItem());
        
    }

}

注意开启事件追踪后,会自动增加两张表job_execution_log和job_status_trace_log。记录定时器相关内容。

作业监听器

监听作业执行前和作业执行后

作业监听器类型:
1、每个作业节点都均执行,无需考虑分布式(推荐)
2、仅单个作业节点执行

类型一:

public class MyNormalListener implements ElasticJobListener {

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        System.out.println(shardingContexts.getJobName()+",方法前!");

    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        System.out.println(shardingContexts.getJobName()+",方法后!");

    }

}
@Configuration
// 如果配置了zookeeper注册中心
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private CoordinatorRegistryCenter zkCenter;
    
    @Autowired
    private DataSource dataSource;
    
    @PostConstruct
    public void initSimpleJob() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for (Map.Entry<String, Object> entry : beans.entrySet()) {
            Object instance = entry.getValue();
            Class<?>[] instances = instance.getClass().getInterfaces();
            for (Class<?> superInstance : instances) {
                // 如果实现了SimpleJob接口
                if (superInstance == SimpleJob.class) {
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite = annotation.overwrite();
                    Class<?> jobStrategy = annotation.jobStrategy();
                    boolean jobEvent=annotation.jobEevent();
                    Class<? extends ElasticJobListener>[] jobListners=annotation.jobListner();
                    ElasticJobListener[] listnerInstances=new ElasticJobListener[jobListners.length];
                    for (int i = 0; i < jobListners.length; i++) {
                        Class<? extends ElasticJobListener> listner =jobListners[i];
                        ElasticJobListener elasticJobListener =listner.getDeclaredConstructor().newInstance();
                        listnerInstances[i]=elasticJobListener;
                    }
                    
                    
                    
                    // job核心参数
                    JobCoreConfiguration jcc=JobCoreConfiguration
                            .newBuilder(jobName, cron, shardingTotalCount).build();
                    // job类型配置
                    JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                    // job根配置
                    LiteJobConfiguration ljc= LiteJobConfiguration
                            .newBuilder(jtc)
                            .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                            .overwrite(overwrite)
                            .build();
                    
                    JobEventConfiguration jec=new JobEventRdbConfiguration(dataSource);
                    if(jobEvent) {
                        // 加入到JobScheduler
                        new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,jec,listnerInstances).init();
                    }else {
                        // 加入到JobScheduler
                        MyNormalListener listener=new MyNormalListener();
                        new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,listnerInstances).init();
                    }
                    
                }
@ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
                ,shardingTotalCount = 10,overwrite = true,
                jobStrategy = MyShardingStrategy.class
                ,jobEevent = true,
                jobListner = MyNormalListener.class)
public class MySimpleJob implements SimpleJob{
    
    @Autowired
    private TestService testService;
    
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("当前分片项==》"+shardingContext.getShardingItem());
        
    }

}

注意作业监听器和分片无关,多个分片也只会监听一次,只和作业节点和任务有关。

运维平台

略(有时间补)

上一篇下一篇

猜你喜欢

热点阅读