使用Redis实现分布式延迟任务
在上一篇demo中,通过redis的zset加定时任务线程池实现了可持久化的延时任务。可实际情况比这个复杂的多,项目是分布式应用,延时任务的线程池单独一个项目,和其他项目互不干扰,这可怎么办?
添加其他的消息中间件吧,成本太高。那就用redis的发布订阅吧。总体的架构如下图所示:

线程池里面放的都是需要执行的延时任务。
下面看代码实现。
一、pom文件引入
<!-- springboot整合redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis序列化所需加包 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
两个必须的架包,一个是springboot整合redis的starter架包,一个是redis序列化的架包。需要注意的是,这里使用的SpringBoot版本是2.6.2,如果使用2.1.4.RELEASE版本,那么我们引入的架包会有所不同。这个可参考:SpringBoot整合Redis,订阅、发布、过期事件
二、启动类编码
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import com.example.demo.controller.DelayTaskExec;
/**
* 启动类
* @author 程就人生
* @Date
*/
@EnableScheduling
@SpringBootApplication
public class SrpingRedisMqDemoApplication implements CommandLineRunner{
// redis操作数据的模板类
@Autowired
private RedisTemplate<String,Object> redisTemplate;
// 延时任务线程池
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
// 日志打印
private static final Logger log = LoggerFactory.getLogger(SrpingRedisMqDemoApplication.class);
// 时间格式
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
SpringApplication.run(SrpingRedisMqDemoApplication.class, args);
}
// 处理缓存中遗留的延时任务
@Override
public void run(String... args) throws Exception {
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
Long scheduleTime = System.currentTimeMillis();
Set<Object> set = zset.rangeByScore("AA", 0, scheduleTime);
DelayTaskExec task = null;
// 历史已经过期的
if(!set.isEmpty()){
set.forEach(str->{
DelayTaskExec task1 = new DelayTaskExec(zset, str.toString());
threadPoolTaskScheduler.schedule(task1, new Date());
});
}
// 历史未过期的
set = zset.range("AA", 0, -1);
if(!set.isEmpty()){
set.forEach(str->{
try{
DelayTaskExec task1 = new DelayTaskExec(zset, str.toString());
threadPoolTaskScheduler.schedule(task1, new Date(zset.score("AA", str.toString()).longValue()));
}catch(Exception e){
log.info("{}已被处理,无需重复处理",str.toString());
}
});
}
}
/**
* 定时任务,模仿在生产环境中生成的消息
*/
@Scheduled(cron = "*/30 * * * * ?")
public void initKeys() {
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
Random random = new Random();
int score = 0;
Long scheduleTime = 0L;
for(int i=0;i<100;i++){
// 取值1到100的随机数
score = random.nextInt(100);
scheduleTime = System.currentTimeMillis() + (1000) * score;
log.info("AA{}{},过期时间:{}",i,score, format.format(new Date(scheduleTime)));
zset.add("AA", "AA" + i + score, scheduleTime);
// 发布消息
redisTemplate.convertAndSend("one", "AA" + i + score);
}
}
}
三、redis相关配置文件
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* redis配置文件
* @author
* @date 2022年7月26日
* @Description
*
*/
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class ARedisConfig {
// redis操作模板序列化、连接工厂配置
@Bean("redisTemplate")
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
// om.activateDefaultTyping(PolymorphicTypeValidator., ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* 消费消息配置
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(MessageConsume messageConsume){
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageConsume”
return new MessageListenerAdapter(messageConsume, "getMessage");
}
/**
* 订阅频道配置
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅一个频道
container.addMessageListener(listenerAdapter, new PatternTopic("one"));
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
Jackson2JsonRedisSerializer<String> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<String>(String.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
container.setTopicSerializer(jackson2JsonRedisSerializer);
return container;
}
/**
* 延时任务线程池配置类
* @return
*/
@Bean("threadPoolTaskScheduler")
public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
// 定时任务线程池
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
// 线程池大小
executor.setPoolSize(10);
// 线程执行前缀
executor.setThreadNamePrefix("ThreadPoolTaskScheduler-");
// executor.setWaitForTasksToCompleteOnShutdown(true);
// executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
在这个配置文件中,包含了几个重要的配置:
-
RedisTemplate模板的相关配置;
-
redis消息的订阅和发布配置,通过通道one发布消息,同时又订阅了通道为one的消息;
-
定时任务连接池的配置;
为了方便测试,就不建立那么多项目了,把这些配置都写在一个配置文件里。分布式项目时再分开即可。
四、消息的消费者
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
/**
* 消息消费者
* @author
* @date 2022年7月26日
* @Description
*
*/
@Component
public class MessageConsume {
private static final Logger log = LoggerFactory.getLogger(MessageConsume.class);
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
public void getMessage(String object){
log.info("订阅的消息:"+ object);
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
Double score = zset.score("AA", object);
if(score != null){
// 加入到定时任务的线程池
DelayTaskExec task = new DelayTaskExec(zset, object);
threadPoolTaskScheduler.schedule(task, new Date(score.longValue()));
}
}
}
在收到消息时,将消息加入到线程池中。和上一篇demo不同的地方就在于此。
五、延时任务执行类
import java.text.SimpleDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ZSetOperations;
/**
* 延时任务执行的线程
* @author
* @Date
*/
public class DelayTaskExec implements Runnable{
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Logger log = LoggerFactory.getLogger(DelayTaskExec.class);
ZSetOperations<String, Object> zset;
String uid;
public DelayTaskExec(ZSetOperations<String, Object> zset, String uid){
this.zset = zset;
this.uid = uid;
}
@Override
public void run() {
// TODO 相关的业务处理
if(zset.score("AA", uid) != null){
Long now = System.currentTimeMillis();
log.info("计划执行时间:{}, 实际执行时间:{},set={}", format.format(zset.score("AA", uid).longValue()), format.format(now), uid);
zset.remove("AA", uid);
}
}
}
这个类和上一个demo没有什么不同。
六、测试
1.测试前先熟悉几个使用到的redis命令;
// 发布消息
publish channelName message
// 订阅消息
subscribe channelName
// 使用条件表达式订阅命令
psubscribe pattern

打开两个redis客户端,一个发布消息,一个订阅消息,执行OK。

再尝试一个,通过正则表达式发布订阅消息,执行OK。
2.启动项目运行结果

通过打印日志,我们可以看到消息成功发布,消息成功订阅,也按照预定时间执行了,测试结果OK,说明这是一个可行的方案。
现在思考一个问题,如果redis也是集群的呢,其实处理方法没变,还是通过发布订阅方式把延时任务集中到一个项目中去做。redis集群发布订阅参考 Redis集群下的过期监听事件notify-keyspace-events 一文,只需把订阅通道名称改一改就搞定了。