redisredis

Redis 延迟任务队列

2018-12-03  本文已影响1人  吕杨卓

背景

在业务发展过程中,会出现一些需要延时处理的场景,比如:

a.订单下单之后超过30分钟用户未支付,需要取消订单

b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论

c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。

处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

使用 Redis 的列表结构可以实现执行一种任务的FIFO队列,也可以实现通过调用不同回调函数的来执行多重不同的任务队列,乃至可以是实现简单的优先级队列,当然也可以实现延时队列。

延时队列的基本实现有3类:

无论是短暂的等待,还是将任务从入队列,都是已经很好资源的事情,多以通常不会采用第一种方法。如果在本地维护一个任务列表,可以能会导致任务丢失,除非对任务进行持久化。其次,通过不断的扫描别表,查找合适的任务,每次都需要循环遍历,也是件浪费资源的事情,所以第二种方法也不可取。最后,采用有序结合保存任务、执行时间作为排序的依据是最简单最直接的做法。采用执行时间排序,不需要每次遍历整个队列,只需要判断队首的元素是否到了可执行时间即可。其次,只需要一个工作进程。再者,可以使用 “分布式锁”机制将任务从有序集合中个移动到任务队列。这样处理,语义简单,逻辑清晰。
Redis 的有序集合天生就适合做这件事。

功能特性
整体结构
801190-20171202153439526-1334143734.png
流程
801190-20171202153600120-587720598.png
redis客户端演示
127.0.0.1:6379> zadd task_set 1 task1
(integer) 1
127.0.0.1:6379> zadd task_set 2 task2
(integer) 1
127.0.0.1:6379> zadd task_set 3 task3
(integer) 1
127.0.0.1:6379> zadd task_set 4 task4
(integer) 1
127.0.0.1:6379> ZRANGE task_set 0 10 WITHSCORES
1) "task1"
2) "1"
3) "task2"
4) "2"
5) "task3"
6) "3"
7) "task4"
8) "4"
Java 模拟代码:
package me.touch.redis;

import java.util.Set;
import java.util.UUID;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Tuple;

/**
 * 延时队列
 * @author Knight-Ran
 *
 */
public class delayQueue {
    private Jedis jedis;
    private JedisPool pool;
    private static final String QUEUE_NAME = "deplay_queue";
    
    @Before
    public void setUp() {
        pool = new JedisPool(new JedisPoolConfig(), "localhost");
        jedis = pool.getResource();
    }

    @After
    public void after() {
        jedis.close();
        pool.destroy();
    }
    
    
    // 模拟任务处理队列
    public static void addToTaskQue(String taskInfo){
        System.out.println(taskInfo+"已经从延时队列中转至队列"+ "当前时间:"+ System.currentTimeMillis() );
        System.out.println();
    }
        
    public void addToDeplayQueue(Task task){
        System.out.println(task.toString()+ "已经加入延时队列");
        jedis.zadd(QUEUE_NAME, task.getTime(), task.toString());
    }
    
    public void transferFromDelayQueue() throws InterruptedException{
        while(true){
            Set<Tuple> item = jedis.zrangeWithScores(QUEUE_NAME, 0, 0);
            if(item != null && !item.isEmpty()){
                Tuple tuple = item.iterator().next();
                if(System.currentTimeMillis() >= tuple.getScore()){
                    // TODO 获取锁
                    jedis.zrem(QUEUE_NAME, tuple.getElement()); // 从延时队列中移除
                    addToTaskQue(tuple.getElement()); //任务推入延时队列,因为这里只是延时
                    // TODO 释放锁
                }
            }
            
            Thread.sleep(100);
            
        }
    }
    
    @Test
    public void test() throws InterruptedException{
         long now = System.currentTimeMillis();
         Task task = new Task(UUID.randomUUID().toString(), now+10*1000, 10*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+20*1000, 20*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+30*1000, 30*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+40*1000, 40*1000+"后执行");
         transferFromDelayQueue();
         
    }
    
    static class Task{
        // 任务id
        private String id ;
        // 任务执行时间
        private long time;
        // 描述
        private String desc;
        
        public Task(String id, long time, String desc){
            this.id = id ;
            this.time = time;
            this.desc = desc;
        }
        
        public String getId() {
            return id;
        }
        public long getTime() {
            return time;
        }
        public String getDesc() {
            return desc;
        }

        @Override
        public String toString() {
            return "Task [id=" + id + ", time=" + time + ", desc=" + desc + "]";
        }
    }
}
测试结果:
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经加入延时队列
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经加入延时队列
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经加入延时队列
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经从延时队列中转至队列当前时间:1502006961481
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经从延时队列中转至队列当前时间:1502006971518
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经从延时队列中转至队

此方式为1.0方案,后续需要把task计划任务修改为redis队列监听或队列元素到期回调

参考链接:https://www.jianshu.com/p/63d5c42299f9
作者:非典型程序员
來源:简书

上一篇下一篇

猜你喜欢

热点阅读