利用redis做一个简单的消息队列
2020-08-23 本文已影响0人
guessguess
下面先贴pom文件的配置
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gee</groupId>
<artifactId>sb-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 开启热部署 -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定maven编译的jdk的版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 打包成springboot专用的jar包,指定入口信息等等 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置文件
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.timeout=3000
spring.redis.pool.max-idle=200
spring.redis.pool.min-idle=200
spring.redis.pool.max-active=2000
spring.redis.pool.max-wait=1000
相关的实体类
public class TaskItem<T> {
public String taskId;
public T msg;
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public T getMsg() {
return msg;
}
public void setMsg(T msg) {
this.msg = msg;
}
@Override
public String toString() {
return "TaskItem [taskId=" + taskId + ", msg=" + msg + "]";
}
}
public class User implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private Integer id;
private String username;// 用户姓名
private String sex;// 性别
private Date birthday;// 生日
private String address;// 地址
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public Date getBirthday() {
return birthday;
}
public void setBirthday(Date birthday) {
this.birthday = birthday;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "User [id=" + id + ", username=" + username + ", sex=" + sex
+ ", birthday=" + birthday + ", address=" + address + "]";
}
}
消费者的接口,用于我是使用一个线程作为消费者,重复从队列中获取消息,因此结构如下
public interface RedisQueueConsumer<T> extends Runnable{
public void consumer();
public void handle(TaskItem<T> t);
}
生产者,对于生产者来说功能都是一样的,因此没必要实现接口。
public class RedisDelayQueue<T> {
private StringRedisTemplate strRedisTemplate;
private String queueKey;
//作为分数,分数小的优先消费。
private AtomicInteger actomicInteger = new AtomicInteger(0);
//实现消息队列需要三个参数,生产者,消费者,以及队列。
public RedisDelayQueue(StringRedisTemplate strRedisTemplate, String queueKey, RedisQueueConsumer<T> consumer) {
this.strRedisTemplate = strRedisTemplate;
this.queueKey = queueKey;
new Thread(consumer).start();
}
//统一的生产入口。
public boolean addTask(TaskItem<T> taskItem) {
String value = JSON.toJSONString(taskItem);
boolean addResult = strRedisTemplate.opsForZSet().add(this.queueKey, value, actomicInteger.getAndAdd(1));
System.out.println("task = " + value + " addResult = " + addResult);
return addResult;
}
}
最终的消息队列实现类。
package com.gee.configuration;
import java.lang.reflect.Type;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.gee.entity.TaskItem;
import com.gee.entity.User;
import com.gee.redis.RedisDelayQueue;
import com.gee.redis.RedisQueueConsumer;
@Configuration
public class RedisQueueConfiguration {
@Autowired
private StringRedisTemplate redisTemplate;
@Bean
public RedisDelayQueue<User> redisDelayQueue() {
//设置队列的名称,以及每次处理的数量,以及范围
String queueKey = "test-queue";
double min = 0;
double max = Double.MAX_VALUE;
int startIndex = 0;
int limit = 1;
return new RedisDelayQueue<User>(redisTemplate, queueKey, new RedisQueueConsumer<User>() {
@Override
public void run() {
consumer();
}
@Override
public void consumer() {
Type TaskType = new TypeReference<TaskItem<User>>() { }.getType();
while(true) {
Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, min, max, startIndex, limit);
if(CollectionUtils.isEmpty(values)) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
if(values.iterator().hasNext()) {
String value = values.iterator().next();
if(redisTemplate.opsForZSet().remove(queueKey, value) > 0) {
TaskItem task = JSON.parseObject(value, TaskType);
handle(task);
}
}
}
}
@Override
public void handle(TaskItem<User> t) {
System.out.println(t.toString());
}
});
}
}
测试方法
public class TestRedis extends ConfigTest{
@Test
public void testRedis() {
RedisDelayQueue<User> redisDelayQueue = (RedisDelayQueue) ApplicationContextUtils.APP_CONTEXT.getBean("redisDelayQueue");
TaskItem<User> taskItem = new TaskItem<User>();
taskItem.setTaskId(UUID.randomUUID().toString());
User user = new User();
user.setUsername("username");
user.setId(100);
taskItem.setMsg(user);
TaskItem<User> taskItem2 = new TaskItem<User>();
taskItem2.setMsg(user);
redisDelayQueue.addTask(taskItem);
redisDelayQueue.addTask(taskItem2);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
最终输出结果
task = {"msg":{"id":100,"username":"username"},"taskId":"551e5636-6857-487a-85be-c5d7ec680ad4"} addResult = true
task = {"msg":{"id":100,"username":"username"}} addResult = true
TaskItem [taskId=551e5636-6857-487a-85be-c5d7ec680ad4, msg=User [id=100, username=username, sex=null, birthday=null, address=null]]
TaskItem [taskId=null, msg=User [id=100, username=username, sex=null, birthday=null, address=null]]