01-Elastic_Job使用和源码入口分析

2018-09-11  本文已影响0人  cjxz

Elastic-job使用javacode方式实现

1.导入依赖

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>

2.开发job

package com.elasticjob;

import java.util.Date;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.time.DateFormatUtils;

/**
 * @Author: chao.zhu
 * @description:
 * @CreateDate: 2018/08/16
 * @Version: 1.0
 */
public class MyElasticJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        try{
            System.out.println(DateFormatUtils.format(new Date(),"yyyy-MM-dd hh:mm:ss")+":定时任务开始执行");
            //设置zeekeeper的节点内容
            String ZK_CONNECTION  = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
            ZkClient zkClient = new ZkClient(ZK_CONNECTION,5000);
            //创建临时节点
            String v = zkClient.readData("/name",false);
            System.out.println("/test内容:"+v);
            zkClient.writeData("/name",v+"$");
        }catch (Exception e){
            e.printStackTrace();
        }finally {

        }
    }
}

3.测试类

package com.elasticjob;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Author: chao.zhu
 * @description:
 * @CreateDate: 2018/08/16
 * @Version: 1.0
 */
public class JobTest2{
    public static void main(String[] args)  {

        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();

    }



    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", "elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }
    private static LiteJobConfiguration createJobConfiguration() {
        // 创建作业配置
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/30 * * * * ?", 1).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        return simpleJobRootConfig;
    }
}

碰到的坑

1.JobCoreConfiguration注册的是持久化zookeeper节点,并且不会更新节点数据。也就是说第一次注册的corn之后,修改corn不会发送变化。可以进入zookeeper中查看节点数据
[zk: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183(CONNECTED) 11] ls /elastic-job-demo/demoSimpleJob
[leader, servers, config, instances, sharding]
[zk: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183(CONNECTED) 12] get /elastic-job-demo/demoSimpleJob/config
{"jobName":"demoSimpleJob","jobClass":"com.elasticjob.MyElasticJob","jobType":"SIMPLE","cron":"0/30 * * * * ?","shardingTotalCount":1,"shardingItemParameters":"","jobParameter":"","failover":false,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":false}
cZxid = 0xa00000433
ctime = Thu Aug 16 11:01:26 CST 2018
mZxid = 0xa00000433
mtime = Thu Aug 16 11:01:26 CST 2018
pZxid = 0xa00000433
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 613
numChildren = 0

知道zookeeper的人应该明白我们所有的定时任务节点都会在Scheduler下面。我们在使用zookeeper的时候创建了命名空间,也就是后续的定时任务节点都会在这个命名空间下面。

节点 含义
leader 保存选举获得的master节点
servers 集群中运行任务的节点IP
config 配置的corn,定时任务名称等数据都存放在这个节点下
instances 多个服务器启动时每个服务器都会注册一个实例到这个节点下
sharding 分片信息

LiteJobConfiguration里面有个overwrite是可以重写定时任务配置

Elastic-job使用spring命名空间配置

导入依赖

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.6-SNAPSHOT</version>
</dependency>

Elastic_job监控中心

启动监控中心

1.使用mvn install编译打包elastic-job-lite-console

2.打包完毕会生产elastic-job-lite-console-2.1.6-SNAPSHOT.tar.gz

3.解压运行start.sh

4.登录:http://localhost:8899/

5.添加注册中心

Elastic_job源码分析

        //创建定时任务配置信息
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0 0/1 * * * ?", 1).build();
        //使用简单任务包装上面创建的定时任务配置信息。除了SimpleJob还有两种定时任务类型
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
        //定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
        //定义配置中心,zookeeper
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", "elastic-job-demo"));
        regCenter.init();
        //将所有信息整合到JobScheduler
        new JobScheduler(regCenter,simpleJobRootConfig).init();

编程式开发定时任务采用了装饰者模式

    /**
     * 初始化作业.
     */
    public void init() {
        //创建命名空间节点,创建任务节点,创建任务的配置属性节点config。
        //config内容:{"jobName":"demoSimpleJob","jobClass":"com.elasticjob.MyElasticJob","jobType":"SIMPLE","cron":"0/30 * * * * ?","shardingTotalCount":1,"shardingItemParameters":"","jobParameter":"","failover":false,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":false}
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);

        //分片信息,将任务的名字作为key,分片大小作为value,放到map里面
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());

        //创建Quartz需要的类
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(),
                createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()),
                liteJobConfigFromRegCenter.getJobName());

        //1.用job的名字对应jobScheduleController放到map中
        //2.用job的名字对应注册中心regCenter放到map中
        //3.将任务节点的name写到treeCache里面,没有写到zk中
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);

        //注册,并启动任务!这个方法比较重
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());

        //使用原生的quartz启动定时任务
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }
    /**
     * 更新作业配置.
     *
     * @param liteJobConfig 作业配置
     * @return 更新后的作业配置
     */
    public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
        //持久化作业配置。将liteJobConfig写到zk里面
        configService.persist(liteJobConfig);

        //然后重新从zk里面读取出来
        return configService.load(false);
    }
    /**
     * 注册作业启动信息.
     * SchedulerFacade的门面类主要是用来操作zk的。下面的各种service类其实都是利用zk和jobName来做各种事情。
     * 看之前一定要记住是在分布式环境下来分析这个方法!!切记切记
     * @param enabled 作业是否启用
     */
    public void registerStartUpInfo(final boolean enabled) {
        //TODO 重要方法,需要仔细看
        //开启所有的监听。这个方法很重要!!!,用来监听任务的状态
        listenerManager.startAllListeners();

        //选举主节点,采用的方式是curator的LeaderLatch来选举
        leaderService.electLeader();

        //将服务器IP地址写到jobName/servers/ip 写到zk里面
        serverService.persistOnline(enabled);

        //将运行的实例写到zk里面。节点为:jobName/instances/ip@-@进程号
        instanceService.persistOnline();

        //持久化分片节点,节点:jobName/sharding
        shardingService.setReshardingFlag();

        //监控服务的监听器,需要开启才会进行监听,开启方式:jobConfig.setMonitorPort(9888);
        monitorService.listen();

        //自诊断修复
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读