Redis Stream xRead 与xReadGroup读取

2024-12-05  本文已影响0人  wds_94

现在有这样一个需求:
用户生成订单后,需要将订单数据放入缓存中,在订单数据里有一个参数为结束时间,每个订单的结束时间是不定的,没有先后顺序,可能先生成的订单 结束时间在后面,需要在结束时候那个时刻,对订单进行业务处理,写出缓存订单及取缓存订单的方法。

xReadGroup实现

刚开始使用stream group的方式实现,但发现在redis消费组的数据只能被消费一次,已经被read之后的数据不会被重复read。已消费未确认的数据可以被同一个stream的其他消费组读取。

xRead实现

xread 可以重复读取,每次读取都从队列的开头开始读,获取所有数据后过滤出你想要的数据进行处理 ,处理完成删除该数据。下次读取未被标记的数据还能被再次读取到,这样就可以简单方式完成该需求。
这里还有一个优化点是 可以根据结束时间生成streamId,读取数据时就查询指定节省时间内的数据,这样可以减少查询的数据量,具有更好的性能。这种方式后续等我更新完再补充。

两种代码对比如下:

使用group


    /**
     * 设置待平仓订单 已废弃
     * 使用redis stream+group方式
     */
    @Override
    public void setOrderCache(Order order) {
        redisTemplate.opsForStream().add(String.format(ORDER_STREAM_COIN_KEY, order.getGoodsName()), order.toMap());
        createOrderConsumerGroup(order.getGoodsName());
    }

    /**
     * 获取待处理订单 已废弃
     * 使用redis stream+group方式
     * 由于消费组的数据只会被消费一次,如果已消费但未到平仓时间,则不会被标记,下次查询也获取不到,
     * 故废弃该方法
     */
    @Override
    public List<Order> getOrderCache(String goodsName) {
        long currentTime = System.currentTimeMillis();  // 获取当前时间戳
        String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
        String groupName = String.format(ORDER_STREAM_GROUP_COIN_KEY, goodsName);

        // 使用消费组读取数据,从未消费的位置开始
        List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
                .read(Consumer.from(groupName, goodsName + "待处理订单Consumer"),
                      StreamReadOptions.empty().block(Duration.ofSeconds(0L)),
                      StreamOffset.create(streamKey, ReadOffset.latest()));
        if (CollectionUtils.isEmpty(messages)) {
            return Collections.emptyList();
        }
        log.info("消费组消息:{}", JacksonUtils.toJson(messages));
        List<Order> orderList = new ArrayList<>();
        for (MapRecord<String, Object, Object> record : messages) {
            log.info("record:{}", JacksonUtils.toJson(record.getValue()));
            Order order = new Order(record.getValue());
            if (order.getCloseTime() / 1000 <= currentTime / 1000) {
                orderList.add(order);
                // 处理完成后确认该消息
//                redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
            }
        }
        return orderList;
    }

    public void createOrderConsumerGroup(String coin) {
        String groupExistsKey = String.format(ORDER_GROUP_EXISTS_KEY, goodsName);
        // 检查 Redis 中是否有消费组存在的标记
        Boolean groupExists = (Boolean) redisTemplate.opsForValue().get(groupExistsKey);
        if (Boolean.TRUE.equals(groupExists)) {
            // 如果消费组已存在,则跳过创建
            log.warn("消费组已存在:{}", coin);
            return;
        }
        try {
            String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
            String groupName = String.format(ORDER_STREAM_GROUP_COIN_KEY, goodsName);
            // 创建消费组
            redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
            log.warn("消费组已存在:{}", groupName);

            // 设置消费组存在的标记,避免重复创建
            redisTemplate.opsForValue().set(groupExistsKey, true);

        } catch (Exception e) {
            redisTemplate.opsForValue().set(groupExistsKey, true);
        }
    }

使用stream可以重复读

    /**
     * 设置待平仓订单
     * @param order 订单数据
     */
    @Override
    public void setOrderCache(Order order) {
        String streamKey = String.format(ORDER_STREAM_COIN_KEY, order.getGoodsName());
        redisTemplate.opsForStream().add(streamKey, order.toMap());
    }

    /**
     * 获取待平仓订单
     * @param  商品名
     * @return 订单列表
     */
    @Override
    public List<Order> getOrderCache(String goodsName) {
        long currentTime = System.currentTimeMillis();  // 获取当前时间戳
        return getOrderCache(goodsName,currentTime);
    }
    /**
     * 获取待平仓订单
     * @param coin goods币种
     * @return 订单列表
     */
    @Override
    public List<Order> getOrderCache(String goodsName, long currentTime) {

        String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
        List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
                .read(StreamOffset.create(streamKey, ReadOffset.from("0")));

        if (CollectionUtils.isEmpty(messages)) {
            return Collections.emptyList();
        }

        log.info("消费组消息:{}", JacksonUtils.toJson(messages));
        List<Order> orderList = new ArrayList<>();
        for (MapRecord<String, Object, Object> record : messages) {
            Order order = new Order(record.getValue());
            if (order.getCloseTime() / 1000 <= currentTime / 1000) {
                log.info("record:{}", JacksonUtils.toJson(record.getValue()));
                orderList.add(order);
                redisTemplate.opsForStream().delete(streamKey, record.getId());
            }
        }
        return orderList;
    }

上一篇 下一篇

猜你喜欢

热点阅读