Spring分布式任务高效流程

2021-06-21  本文已影响0人  十毛tenmao

项目中有很多定时任务,而且他们的执行模式非常类似,抽象整理如下

定时任务特点

执行流程

代码示例

@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
public class RedisSortedSetTaskWorker {
    private final RedissonClient redissonClient;
    private final StringRedisTemplate stringRedisTemplate;

    private final String redisKey;
    private final Function<String, Boolean> task;
    private Predicate<String> directRemoveFilter;
    private boolean continueOnError = true;

    public void work(double beginScore, double endScore, @Nullable Integer limit) {
        //获取符合条件的所有子任务
        Set<String> subTasks = limit == null
                ? stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore)
                : stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore, 0, limit);
        log.info("[{}]: there are [{}] sub tasks to try to work", redisKey, subTasks == null ? 0 : subTasks.size());
        if (subTasks == null) {
            return;
        }

        //删除子任务
        if (directRemoveFilter != null) {
            Set<String> toRemove = subTasks.stream().filter(directRemoveFilter).collect(Collectors.toSet());
            Object[] directToRemove = toRemove.toArray();
            if (directToRemove.length > 0) {
                stringRedisTemplate.opsForZSet().remove(redisKey, directToRemove);
                subTasks.removeAll(toRemove);
            }
        }

        log.info("there are [{}] sub tasks to work after filter", subTasks.size());
        for (String subTask : subTasks) {
            try {
                String lockKey = redisKey + ":lock:" + subTask;
                trySubmit(lockKey, () -> {
                    //如果已经不存在了,就直接返回,说明可能被其他节点执行了
                    Double score = stringRedisTemplate.opsForZSet().score(redisKey, subTask);
                    if (score == null) {
                        return;
                    }

                    //如果执行成功就删除该子任务(subTask)
                    boolean success = task.apply(subTask);
                    if (success) {
                        stringRedisTemplate.opsForZSet().remove(redisKey, subTask);
                    }
                });
            } catch (RuntimeException e) {
                log.warn("fail to work one sub task: redisKey[{}], subTask[{}]", redisKey, subTask, e);

                //遇到异常是否继续
                if (continueOnError) {
                    continue;
                }
                throw e;
            }
        }
    }

    /**
     * 尝试分布式环境下互斥执行任务.
     *
     * @param lockName 分布式锁
     * @param work     任务
     * @return true锁成功,false锁失败
     */
    private boolean trySubmit(String lockName, Runnable work) {
        RLock lock = redissonClient.getLock(lockName);
        //获取分布式锁
        if (!lock.tryLock()) {
            log.info("fail to lock [{}]", lockName);
            return false;
        }
        log.info("lock [{}] successfully", lockName);
        try {
            work.run();
            return true;
        } catch (Exception e) {
            log.warn("fail to trySubmit", e);
            throw e;
        } finally {
            lock.unlock();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读