扩展Jedis功能实现对Redis集群Pipeline

2021-09-03  本文已影响0人  上岸大虾米

在使用Jedis操作Redis集群中发现Jedis不支持集群Pipeline操作,尝试扩展该功能,目前仅通过了初步的验证,这里仅仅是提供一个思路、参考。

实现集群Pipeline步骤

  1. 计算key所对应的slot,获取对应的jedis对象
  2. 将所有的key按照jedis分组
  3. 每个分组分别执行对应的Pipeline
  4. 汇总结果集

同时可以使用这个思路,解决JedisCluster 不支持的mget,del多key等类似问题。

定义操作接口

import redis.clients.jedis.RedisPipeline;

import java.util.List;

/**
 * @author Adimin
 */
public interface JedisPipeline extends RedisPipeline {

  /**
   * @see redis.clients.jedis.Pipeline#syncAndReturnAll()
   * @return 操作结果
   */
  List<Object> syncAndReturnAll();

}

实现类

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.session.Configuration;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 实现集群Pipeline步骤
 * 1、计算key所对应的slot,获取对应的jedis对象
 * 2、将所有的key按照jedis分组
 * 3、每个分组分别执行对应的Pipeline
 * 4、汇总结果集
 *
 * @author Adimin
 */
public class ClusterPipelineUtil extends Pipeline implements JedisPipeline {

    private int size = 0;

    /**
     * redis -> pipeline 映射关系
     * 集群中是由多个redis组成,每个redis节点维护一个pipeline
     */
    private Map<Jedis, Pipeline> redisAndPipelineMap = Maps.newHashMap();

    /**
     * pipeline 与 key所在的位置
     */
    private Map<Pipeline, ArrayList<Integer>> resultMap = Maps.newHashMap();

    private Map<JedisPool, Jedis> poolAndJedisMap = Maps.newHashMap();

    /**
     * TODO 未考虑redis集群槽重新分配场景
     * 槽与JedisPool映射关系
     */
    private Map<Integer, JedisPool> slots;

    private ClusterPipelineUtil(JedisCluster jedisCluster) {
        // 构建mybatis提供的工具
        Configuration configuration = new Configuration();
        MetaObject metaObject = configuration.newMetaObject(jedisCluster);
        slots = (Map<Integer, JedisPool>) metaObject.getValue("connectionHandler.cache.slots");
    }

    private Pipeline before(String key) {
        // 计算key所在槽的编号
        int slot = JedisClusterCRC16.getSlot(key);
        JedisPool jedisPool = slots.get(slot);
        // 1
        Jedis jedis = poolAndJedisMap.computeIfAbsent(jedisPool, JedisPool::getResource);
        // 2
        Pipeline pipeline = redisAndPipelineMap.computeIfAbsent(jedis, BinaryJedis::pipelined);
        ArrayList<Integer> keyOrderList = resultMap.computeIfAbsent(pipeline, (k) -> Lists.newArrayList());
        keyOrderList.add(size++);
        return pipeline;
    }

    private void clearNew() {
        size = 0;
        resultMap.clear();
        poolAndJedisMap.clear();
        // 将资源返还至JedisPool池中
        redisAndPipelineMap.keySet().forEach(Jedis::close);
        redisAndPipelineMap.clear();
    }

    @Override
    public List<Object> syncAndReturnAll() {
        Object[] arr = new Object[size];
        try {
            redisAndPipelineMap.values().forEach(pipeline -> {
                // 3
                List<Object> list = pipeline.syncAndReturnAll();
                ArrayList<Integer> integers = resultMap.get(pipeline);
                for (int i = 0; i < integers.size(); i++) {
                    // 4
                    arr[integers.get(i)] = list.get(i);
                }
            });
        }finally {
            clearNew();
        }
        return Lists.newArrayList(arr);
    }

    public static JedisPipeline newInstance(JedisCluster jedisCluster) {
        ClusterPipelineInvocationHandler handler = new ClusterPipelineInvocationHandler();
        return handler.getProxy(jedisCluster);
    }

    public static class ClusterPipelineInvocationHandler implements InvocationHandler {

        private ClusterPipelineUtil clusterPipeline;

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (args == null || args.length == 0) {
                return method.invoke(clusterPipeline, args);
            }
            String key = (String) args[0];
            Pipeline before = clusterPipeline.before(key);
            return method.invoke(before, args);
        }

        JedisPipeline getProxy(JedisCluster jedisCluster) {
            clusterPipeline = new ClusterPipelineUtil(jedisCluster);
            return (JedisPipeline) Proxy.newProxyInstance(clusterPipeline.getClass().getClassLoader(), clusterPipeline.getClass().getInterfaces(), this);
        }
    }

}


测试类

     // 省略jedisCluster构建

    @Before
    public void before(){
        jedisCluster.del("a");
        jedisCluster.del("b");
        jedisCluster.del("c");
        jedisCluster.del("d");

        jedisCluster.sadd("a","1","2","3");
        jedisCluster.sadd("b","4");

        jedisCluster.set("c","5");
        jedisCluster.set("d","6");
    }

    @Test
    public void test()  {
        JedisPipeline pipelined = ClusterPipelineUtil.newInstance(jedisCluster);

        pipelined.sismember("a", "1");
        pipelined.sismember("a", "2");
        pipelined.sismember("a", "3");
        pipelined.sismember("a", "4");

        pipelined.sismember("b", "4");
        pipelined.sismember("b", "5");

        pipelined.get("c");
        pipelined.get("d");

        List<Object> resultList = pipelined.syncAndReturnAll();
        resultList.forEach(System.out::println);
    }
上一篇下一篇

猜你喜欢

热点阅读