redis cluster模式 使用pipeline批量操作

2022-07-14  本文已影响0人  就叫basi

由于cluster模式下,key的存放需要先使用JedisClusterCRC16计算出solt,在定位solt所在的节点,所以直接使用pipeline会报错

Pipeline is currently not supported for JedisClusterConnection.

思路

1.将需要操作的key计算出对应的solt,得到hostAndPort,分组存放在一个map中。(Map<JedisPool, List<String>> node2keys = new HashMap<>())
2.通过得到的JedisPool,分开使用jedis的pipeline执行
3.将返回数据组装返回

code

1.先得到一个JedisCluster


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;

@Configuration
public class JedisClusterConfig {

    @Value("${spring.redis.cluster.nodes}")
    private String nodes;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.pool.max-idle}")
    private int maxIdle;
    @Value("${spring.redis.pool.max-wait}")
    private long maxWait;

    @Bean("jedisCluster")
    public JedisCluster jedisCluster() {
        String[] redisNodes = nodes.split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        for (String node : redisNodes) {
            String[] ipPortPair = node.split(":");
            nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
        }
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(maxIdle);
        config.setMaxWaitMillis(maxWait);
        JedisCluster cluster = new JedisCluster(nodes, 8000, 5000, 3, password, config);
        return cluster;

    }

}

2.实现key的分组


import com.sqyc.carnet.util.ChineseUtil;
import com.sqyc.carnet.util.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;

import java.util.*;

/**
 * 基于JedisCluster实现管道的使用
 * 核心对象:JedisClusterInfoCache和JedisSlotBasedConnectionHandler
 */
@Slf4j
public class JedisClusterPipeline {

    /**
     * Redis集群缓存信息对象 Jedis提供
     */
    private JedisClusterInfoCache clusterInfoCache;

    /**
     * Redis链接处理对象 继承于JedisClusterConnectionHandler,对其提供友好的调用方法 Jedis提供
     */
    private JedisSlotBasedConnectionHandler connectionHandler;

    /**
     * Redis集群操作对象 Jedis提供
     */
    private JedisCluster jedisCluster;

    /**
     * 构造方法
     * 通过JedisCluster获取JedisClusterInfoCache和JedisSlotBasedConnectionHandler
     *
     * @param jedisCluster
     */
    public JedisClusterPipeline(JedisCluster jedisCluster) {
        this.jedisCluster = jedisCluster;
        MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
        clusterInfoCache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
        connectionHandler = (JedisSlotBasedConnectionHandler) metaObject.getValue("connectionHandler");
    }

   /**
     * 批量get key
     *
     * @param keys
     */
    public Map<String, Object> clusterPipelineGet(List<String> keys) {
        //要返回的结果
        Map<String, Object> result = new HashMap<>();
        //节点对应keys分组
        Map<JedisPool, List<String>> node2keys = new HashMap<>();

        for (String key : keys) {
            // 计算key对应的slot
            int slot = JedisClusterCRC16.getSlot(key);
            // 根据slot获取对应的节点信息,将同一节点的key收在一组
            JedisPool jedisPool = this.clusterInfoCache.getSlotPool(slot);
            if (null == jedisPool) {
                /** 刷新缓存的SlotPool */
                this.connectionHandler.renewSlotCache();
                jedisPool = this.clusterInfoCache.getSlotPool(slot);
                if (jedisPool == null) {
                    log.error("clusterPipelineGet , No reachable node in cluster for slot---{}", slot);
                    continue;
                }
            }
            if (node2keys.containsKey(jedisPool)) {
                node2keys.get(jedisPool).add(key);
            } else {
                List<String> list = new ArrayList<>();
                list.add(key);
                node2keys.put(jedisPool, list);
            }
        }
        if (node2keys.isEmpty()) {
            log.error("clusterPipelineGet , node2keys is empty");
            return result;
        }
        // 分组执行
        for (Map.Entry<JedisPool, List<String>> group : node2keys.entrySet()) {
            Jedis jedis = group.getKey().getResource();
            Pipeline pipeline = jedis.pipelined();
            // 执行本组keys
            List<String> values = group.getValue();
            for (String carNo : values) {
                pipeline.get(carNo);
            }
            List<Object> pipelineReturns = pipeline.syncAndReturnAll();
            //组装redis返回结果,以免请求的key和返回的values顺序错乱
            for (int i = 0; i < values.size(); i++) {
                result.put(values.get(i), pipelineReturns.get(i));
            }
            jedis.close();
        }
        return result;
    }

}

调用示例

    @Override
    public String queryMobileCarStatusBatch(List<String> carNos) {

        JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisCluster);
        try {
            Map<String, Object> result = jedisClusterPipeline.clusterPipelineGet(carNos);
            return createReturnValue(RequsetReturnValueUtils.REQUSET_RETURN_CODE_NORMAL, "成功", result);
        } catch (Throwable t){
            log.error("批量查询统计失败,carNos= {},error=={}",carNos,t.getMessage());
        }
        return createReturnValue(RequsetReturnValueUtils.REQUSET_RETURN_CODE_FAIL, "失败", "");

    }
vans
上一篇 下一篇

猜你喜欢

热点阅读