分布式系列

Redis分布式锁 Spring Schedule实现任务调度

2018-10-16  本文已影响369人  ChaLLengerZeng

一看到标题就知道,这一篇博客又是总结分布式工作环境中集群产生的问题,个人觉得分布式没有那么难以理解,可能也是自己见识比较浅,对我来说,分布式只是一种后端业务演进时的一种工作方式,而真正实现这种工作方式的是集群

关于集群是什么以及如何搭建集群环境,可以参考之前我的博文,这一片博客将着重介绍Redis分布式锁,这是一个基于SpringBoot构建的高并发电商后端服务项目,并且其中框架包括的Spring Schedule框架搭建的定时任务模块去实现定时关闭未付款的订单

项目地址: https://github.com/challengerzsz/Mall

在项目演进的过程中集群化就意味着复杂,复杂就意味着出问题,那么这个时候今天分享的这个问题是多进程可能在统一进程都去进行关单任务,这是不必要的,后面进行详细分析,希望看到这篇博客的初学者们能够了解到这听起来高大上的概念其实不是那么困难的,有经验的同行们欢迎指正修改

如何去实现一个关单服务

试想一下,自己是否有在购物客户端(web/app)中发起订单之后,本该选择支付的或者是因为什么原因,没钱了也好或者不想买了也好,订单挂在那里,但是却没有取消掉,这个例子在一个场景商城后端的项目中如何去解决呢?

关单的逻辑在我看来可以有下面实现方式

现在的购物平台给我的感觉这三种逻辑是都存在的,肯定也存在别的实现方式,我在这里也就是举例一下,下面来说今天的主题

Spring Schedule实现定时关单

今天的主题其实是上述三种分析的第三种实现,使用Spring提供的框架实现定时任务,制定自己的逻辑,下面介绍Schedule这个框架的时候,大家可以大概了解一下cron表达式,并且可以百度搜索一些cron自动生成的网站去方便开发

Spring Schedule给我的感觉优点有下面3点

  1. 基于注解来设置调度器。
  2. 非常方便实现简单的调度
  3. 对代码不具有入侵性,非常轻量级

什么是cron表达式

cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义 ,cron表达式在这里不具体介绍,具体可参考下列几图

举一个例子

@Component
public class Task {
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @Scheduled(cron = "0 */1 * * * ?")
    public void testSchedule() {
        logger.info("定时任务启动");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("定时任务执行完成");
    }
}

很简单将定时任务类使用@Component声明为Spring容器的一个组件Bean,使用@Schedule并且指定cron参数,并且在执行这个方法的时候让处理定时任务的这条线程睡2s,来模拟处理逻辑的时间,这个定时方法将会在每分钟整的时候被调用,看看效果

执行定时任务的日志

可以看到线程1从8:20:00被调用,线程睡眠2s模拟真实处理逻辑后,8:20:02执行完成

实现关单定时器 V 1.0

V1.0的逻辑实现很简单,写一个定时方法,直接去处理关单

@Scheduled(cron = "0 */1 * * * ?")
public void closeOrderTaskV1() {

    logger.info("关闭订单定时任务启动");
    //这里是你自己的关单逻辑
    logger.info("关闭订单定时任务结束");
}

我的关单逻辑是这样的,定时任务触发之后,关闭掉发起订单2个小时,却未付款的订单,关单逻辑可以是可配置的,使用Spring配置类的注解,读取.yml或.xml中关单的需要关闭订单的超时时间,之后的对数据库操作在这里也不提

大家可以想想这个V1.0版本会有什么问题,尤其是在集群环境下

多进程同时启动定时关单

集群环境下,Tomcat集群其实是多个进程,并且部署的项目逻辑代码完全一致,这个时候问题来了,大家也应该都想到了,多个进程都会在分钟的整数倍的时候执行这个定时任务,会有什么问题呢

Redis分布式锁

我们通过实现一个Redis分布式锁,来控制某一时刻只会有一条进程进行定时任务,Redis分布式锁给我的感觉有点像重入锁的计数器,通过其中的标示来实现这个锁

定时任务V 2.0

@Scheduled(cron = "0 */1 * * * ?")
public void closeOrderTaskV2 {
    logger.info("关闭订单定时任务启动");
    //redis分布式锁的上锁时间ms
    long lockTimeOut = mallProperties.getTask().getLockTimeOut();
    Boolean setIfAbsentResult =         redisUtil.setIfAbsent(Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
    String.valueOf(System.currentTimeMillis() + lockTimeOut));

    if (setIfAbsentResult) {
        //若返回值为true则说明获取到了分布式锁,原先没有服务器占用锁
        closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
    } else {
        logger.info("未获取到分布式锁");
    }
    logger.info("关闭订单定时任务结束");
}

private void closeOrder(String lockName) {
    
    logger.info("获取{}, ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
    Integer hour = mallProperties.getTask().getHour();
    orderService.closeOrder(hour);
    //释放锁
    redisUtil.delete(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
    logger.info("释放{}, ThreadName", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
}

定时任务V 3.0 搭配上述私有closeOrder方法使用


@Scheduled(cron = "0 */1 * * * ?")
public void closeOrderTask() {

    logger.info("关闭订单定时任务启动");
    //redis分布式锁的上锁时间ms
    long lockTimeOut = mallProperties.getTask().getLockTimeOut();
    Boolean setIfAbsentResult = redisUtil.setIfAbsent(Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                String.valueOf(System.currentTimeMillis() + lockTimeOut));
    if (setIfAbsentResult) {
        //若返回值为true则说明获取到了分布式锁,原先没有服务器占用锁
        closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
    } else {
        //未获取到锁 判断时间戳判断是否可以重置锁
        String lockValueStr = redisUtil.getRedisValue(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);

        //若该当前时间超过该锁本该释放的时间,但是由于某些原因未被释放则重置该锁
        // 意外终止,还没来得及
        if (lockValueStr != null && System.currentTimeMillis() > Long.parseLong(lockValueStr)) {

            String getSetResult = redisUtil.getSetRedisValue(Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                        String.valueOf(System.currentTimeMillis() + lockTimeOut));
            //将获取到的值与之前的值进行比较若相同则说明原先应该释放的锁没有被释放这个时候可以重置
            //若不相同则说明在这个时间段内另一台tomcat集群已经使获取到了分布式锁这个时候只能是获取不到这个分布式锁
            if (getSetResult == null || (getSetResult != null && StringUtils.equals(getSetResult, lockValueStr))) {
                closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
            } else {
                logger.info("未获取到分布式锁");
            }
        } else {
            // 此时若lockValueStr为null,就说明定时任务已经完成并清除了Redis的那个value。 
            // 若时间没到,说明定时任务正在执行。
            // 两种情况都不需要获取分布式锁,所以不进行操作。
            logger.info("未获取到分布式锁");
        }
    }

    logger.info("关闭订单定时任务结束");
}
private void closeOrder(String lockName) {

    //设置初获取锁的时候有效期,避免永久有效 时间单位s
    redisUtil.expire(lockName, 5);
    logger.info("获取{}, ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
    Integer hour = mallProperties.getTask().getHour();
    orderService.closeOrder(hour);
    //释放锁
    redisUtil.delete(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
    logger.info("释放{}, ThreadName", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
}   

Value值发挥作用

之前尝试锁的逻辑依旧使用setNx进行,只不过现在的未获取到锁的逻辑需要大改一下,而这种改动的实现,其实就是Redis分布式锁实现的核心方式

综上这就是完整的Redis分布式锁实现的定时任务的任务调度模块,接下来还将介绍一个第三方框架帮助我们更好地实现这种逻辑

Redisson的引入

Redisson的github:https://github.com/redisson

中文Wiki:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

因为本项目使用了Redis集群,所以直接看到中文文档的集群模式的设置

官方的集群设置文档

集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版Azure Redis Cache阿里云(Aliyun)的云数据库Redis版

程序化配置集群的用法:

Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
    //可以用"rediss://"来启用SSL连接
    .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
    .addNodeAddress("redis://127.0.0.1:7002");

RedissonClient redisson = Redisson.create(config);

下面来看一下集成它的一些必要配置

@Component
public class RedissonManager {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Config config = new Config();

    private Redisson redisson = null;

    /**
     * 构造器执行完了之后执行这个init方法
     */
    @PostConstruct
    private void init() {

        try {
            this.config.useClusterServers()
                    .setScanInterval(2000)
                    .addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6380", "redis://127.0.0.1:6381","redis://127.0.0.1:6382", "redis://127.0.0.1:6383");
            this.redisson = (Redisson) Redisson.create(config);
            logger.info("初始化Redisson成功");
        } catch (Exception e) {
            logger.error("Redisson 初始化失败", e);
        }
    }

    public Redisson getRedisson() {
        return redisson;
    }

}

有人可能觉的我addNodeAddress方法有点像硬编码,这个方法官方提供的参数是可变字符串参数,我们从配置类中加载出来其实是一个List但是不影响我们使用,这个方法其实就是添加Redis集群中的节点ip:port

可以看到它的官方文档的注释"redis://127.0.0.1:6379"表示启用SSL连接,如果不指定这个协议前缀,我们可以使用http协议进行替换,这里有一个异常情况

Redisson框架采用的address节点要用URI编码,如果单纯使用127.0.0.1:6379它会奇怪得抛出一个异常

Caused by: java.net.URISyntaxException: Illegal character in scheme name at index 0: 127.0.0.1:6379

这里建议使用redis://进行SSL连接,下面看一下log

可以发现我们通过添加配置好的Redis集群节点即可,它会自动去识别主从库,并且识别每个Redis负责的扇区

使用Redisson实现Redis分布式锁

@Scheduled(cron = "0 */1 * * * ?")
public void closeOrderTaskWithRedisson() {

    logger.info("关闭订单定时任务启动
    RLock lock = redissonManager.getRedisson().getLock(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
    boolean getLock = false;
    //尝试获取锁
    try {
        //是否获取到锁
        //如果不设置waitTime为0的话如果一个逻辑或者sql执行的非常快的情况下,就会造成另一个Tomcat进程也会获取到锁执行一遍schedule
        if (getLock = lock.tryLock(2, 5, TimeUnit.SECONDS)) {
            logger.info("Redisson 获取到分布式锁:{} ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                        Thread.currentThread().getName());
            Integer hour = mallProperties.getTask().getHour();
            orderService.closeOrder(hour);
        } else {
            logger.info("Redisson 没有获取到分布式锁:{} ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                        Thread.currentThread().getName());
        }
    } catch (InterruptedException e) {
        logger.error("Redisson 分布式锁获取异常");
    } finally {
        //未获取到锁的话就不需要释放锁,判断getLock
        if (!getLock) {
            return;
        }
        lock.unlock();
        logger.info("Redisson 释放分布式锁");
    }
    logger.info("关闭订单定时任务结束");
}

希望这篇博文能够让大家有所收获,有不正确的地方还希望大家能够认证指正

上一篇 下一篇

猜你喜欢

热点阅读