elastic-job分布式任务
2019-06-17 本文已影响0人
离别刀
项目依赖
sprintboot项目依赖如下
<!--框架核心jar包-->
<dependency>
<groupId>com.github.kuhn-he</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
一般spring项目参见如下
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.2</version>
</dependency>
可以参见此文
https://blog.csdn.net/u010889990/article/details/80000012
或者
https://blog.csdn.net/xvshu/article/details/80755988
配置文件
esjob.serverList=127.0.0.2:2181,127.0.0.3:2181,127.0.0.6:2181
esjob.namespace=esjob
esjob.userJob.shardingTotalCount=3
esjob.userJob.shardingItemParameters=0=0,1=1,2=2
Zookeeper配置中心
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//基于ZK的esjob配置中心
@Configuration
@ConfigurationProperties("esjob")
public class ElasticJobConfig {
private String serverList;
private String namespace;
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(serverList, namespace);
}
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
}
SimpleJobConfig配置
import cn.fastcampus.scheduler.DemoSimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class SimpleJobConfig {
@Value("${esjob.userJob.cron}")
private final String cron;
@Value("${esjob.userJob.shardingTotalCount}")
private final int shardingTotalCount
@Value("${esjob.userJob.shardingItemParameters}")
private final String shardingItemParameters
@Resource
private ZookeeperRegistryCenter regCenter;
@Bean
public DemoSimpleJob simpleJob(){
return new DemoSimpleJob();
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final DemoSimpleJob simpleJob, String cron,int shardingTotalCount,String shardingItemParameters){
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
}
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
定时任务DemoSimpleJob
import cn.cicada.common.metrics.MetricsTimer;
import cn.cicada.common.metrics.MetricsUtil;
import cn.cicada.common.utils.StopWatch;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.chain.impl.ContextBase;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DemoSimpleJob implements SimpleJob {
private final Logger log = LoggerFactory.getLogger(DemoSimpleJob.class);
@Autowired
private UserMapper userMapper;
@Override
public void execute(ShardingContext shardingContext) {
int totalCount = shardingContext.getShardingTotalCount();
log.info("[logFLag:{}] sharding context [Item:{}][parameter:{}],", logFlag, shardingContext.getShardingItem(), shardingContext.getShardingParameter());
//dosomething
List<User> schoolList = userMapper.getUsersByShardingMode(totalCount, Integer.valueOf(shardingContext.getShardingParameter()));
}
}
public interface UserMapper {
@Select("select * from t_user where id % #{mode} = #{residue} and status = 0")
List<User> getUsersByShardingMode(@Param("mode") int mode, @Param("residue") int residue);
}