es-job-2-如何优雅的在springboot项目中使用

2020-08-20  本文已影响0人  大黑跟小白的日常

在es-job-1的基础上我们继续对esjob的使用过程做优化

前提:EsJobConfig 已经配置好(CoordinatorRegistryCenter、JobEventConfiguration)

封装注解类

/**
 * @Author G_Y
 * @Date 2020/8/18 8:35
 * @Description: // cupid 定时任务 注解
 **/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CupidJob {
    //执行策略
    String cron();
    String jobName();
    //shardingTotalCount 分片数量
    int shardingTotalCount() default 1;
    //shardingItemParameters 分片个性化参数
    String shardingItemParameters() default "0=1";
    // 任务参数
    String jobParameters() default "";
}

定义任务抽象类

/**
 * @Author G_Y
 * @Date 2020/8/15 19:22
 * @Description: // cupid 任务 抽象
 **/
public abstract class CupidEsJobTask implements SimpleJob {
    // 注入提前已经配置的 job相关 组件 bean
    @Autowired
    private CoordinatorRegistryCenter regCenter;
    // 注入提前已经配置的 job相关 组件 bean
    @Autowired
    private JobEventConfiguration jobEventConfiguration;
    // 具体job bean构建之后,会利用此方法 对 job 直接进行调度并执行定时任务
    @PostConstruct
    public void executeTask() {
        CupidJob annotation = this.getClass().getAnnotation(CupidJob.class);
        if (annotation == null) {
            throw new RuntimeException("need @CupidJob on job");
        }
        LiteJobConfiguration jobConfiguration = this.createJobConfiguration(annotation.jobName(),
                annotation.cron(), annotation.shardingTotalCount(),
                annotation.shardingItemParameters(), annotation.jobParameters());
        SpringJobScheduler springJobScheduler =
                new SpringJobScheduler(this, regCenter, jobConfiguration, jobEventConfiguration, this.getListener());
        springJobScheduler.init();
    }
    /**
     * @param jobName                任务名称
     * @param cron                   执行策略
     * @param shardingTotalCount     分片数量
     * @param shardingItemParameters 分片个性化参数
     * @return
     * @Author G_Y
     * @Description: : 构建 执行 任务 参数配置 的方法 2
     * @Date 2020/8/15 8:58
     * 配置任务详细信息
     **/
    public LiteJobConfiguration createJobConfiguration(final String jobName,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters,
                                                        final String jobParameters) {
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder =
                JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount);
        //设置shardingItemParameters
        jobCoreConfigurationBuilder.failover(true);// 故障转移
        jobCoreConfigurationBuilder.jobParameter(jobParameters); // 还可以针对job设置条件参数
        if (StringUtils.isNoneEmpty(shardingItemParameters)) {
            jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build();
        //创建SimpleJobConfiguration
        SimpleJobConfiguration simpleJobConfiguration =
                new SimpleJobConfiguration(jobCoreConfiguration, this.getClass().getCanonicalName());
        //创建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration =
                LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                        .monitorPort(9888)//设置dump端口
                        .build();
        return liteJobConfiguration;
    }
   // 个性化listener由实现类提供
    public abstract ElasticJobListener getListener();
}

具体任务类

@Slf4j
@Component
@CupidJob(cron = "*/10 * * * * ?", jobName = "testCupidEsjob1",
        shardingItemParameters = "0=aaa,1=bbb", jobParameters = "haha",
        shardingTotalCount = 2
)
public class TestCupidEsjob1 extends CupidEsJobTask {
    // 个性化ElasticJobListener
    @Resource(name = "myElasticJobListener")
    private MyElasticJobListener myElasticJobListener;
    @Override
    public ElasticJobListener getListener() {
        return myElasticJobListener;
    }
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("作业分片:" + shardingContext.getShardingItem());
        //分片参数,(0=text,1=image,2=radio,3=vedio,参数就是text、image...)
        String jobParameter = shardingContext.getJobParameter();
        String shardingParameter = shardingContext.getShardingParameter();

        //打印出任务相关信息,JobParameter用于传递任务的ID
        log.info("任务名:{}, 片数:{}, shardingParameter={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                shardingParameter);

        System.out.println("===================" + jobParameter + "===================");
        System.out.println("==================" + shardingParameter + "===================");
        // 业务操作
        System.out.println("==============业务操作===============");
        // Feign 调度 service 执行具体业务处理 逻辑
    }
}

启动测试

单台机器运行结果如下


image.png

如果多节点部署,则 不同的任务分片会分散在各个机器一起执行;

番外篇 - elastic-job-lite-console 界面管理 - 2.1.5版本

下载软件:链接:https://pan.baidu.com/s/12INQkpjLsmXZXVUU4w8TSg
提取码:ctyp

下载-解压-进入 elastic-job-lite-console-2.1.5\bin 双击启动(windows版本)

image.png
直接访问:http://localhost:8899/#
image.png
如上图,这里面可以配置访问密码
添加zk任务注册中心的连接
image.png
任务浏览跟手动操作
image.png
上一篇下一篇

猜你喜欢

热点阅读