扩展Jedis功能实现对Redis集群Pipeline
2021-09-03 本文已影响0人
上岸大虾米
在使用Jedis操作Redis集群中发现Jedis不支持集群Pipeline操作,尝试扩展该功能,目前仅通过了初步的验证,这里仅仅是提供一个思路、参考。
实现集群Pipeline步骤
- 计算key所对应的slot,获取对应的jedis对象
- 将所有的key按照jedis分组
- 每个分组分别执行对应的Pipeline
- 汇总结果集
同时可以使用这个思路,解决JedisCluster 不支持的mget,del多key等类似问题。
定义操作接口
import redis.clients.jedis.RedisPipeline;
import java.util.List;
/**
* @author Adimin
*/
public interface JedisPipeline extends RedisPipeline {
/**
* @see redis.clients.jedis.Pipeline#syncAndReturnAll()
* @return 操作结果
*/
List<Object> syncAndReturnAll();
}
实现类
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.session.Configuration;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 实现集群Pipeline步骤
* 1、计算key所对应的slot,获取对应的jedis对象
* 2、将所有的key按照jedis分组
* 3、每个分组分别执行对应的Pipeline
* 4、汇总结果集
*
* @author Adimin
*/
public class ClusterPipelineUtil extends Pipeline implements JedisPipeline {
private int size = 0;
/**
* redis -> pipeline 映射关系
* 集群中是由多个redis组成,每个redis节点维护一个pipeline
*/
private Map<Jedis, Pipeline> redisAndPipelineMap = Maps.newHashMap();
/**
* pipeline 与 key所在的位置
*/
private Map<Pipeline, ArrayList<Integer>> resultMap = Maps.newHashMap();
private Map<JedisPool, Jedis> poolAndJedisMap = Maps.newHashMap();
/**
* TODO 未考虑redis集群槽重新分配场景
* 槽与JedisPool映射关系
*/
private Map<Integer, JedisPool> slots;
private ClusterPipelineUtil(JedisCluster jedisCluster) {
// 构建mybatis提供的工具
Configuration configuration = new Configuration();
MetaObject metaObject = configuration.newMetaObject(jedisCluster);
slots = (Map<Integer, JedisPool>) metaObject.getValue("connectionHandler.cache.slots");
}
private Pipeline before(String key) {
// 计算key所在槽的编号
int slot = JedisClusterCRC16.getSlot(key);
JedisPool jedisPool = slots.get(slot);
// 1
Jedis jedis = poolAndJedisMap.computeIfAbsent(jedisPool, JedisPool::getResource);
// 2
Pipeline pipeline = redisAndPipelineMap.computeIfAbsent(jedis, BinaryJedis::pipelined);
ArrayList<Integer> keyOrderList = resultMap.computeIfAbsent(pipeline, (k) -> Lists.newArrayList());
keyOrderList.add(size++);
return pipeline;
}
private void clearNew() {
size = 0;
resultMap.clear();
poolAndJedisMap.clear();
// 将资源返还至JedisPool池中
redisAndPipelineMap.keySet().forEach(Jedis::close);
redisAndPipelineMap.clear();
}
@Override
public List<Object> syncAndReturnAll() {
Object[] arr = new Object[size];
try {
redisAndPipelineMap.values().forEach(pipeline -> {
// 3
List<Object> list = pipeline.syncAndReturnAll();
ArrayList<Integer> integers = resultMap.get(pipeline);
for (int i = 0; i < integers.size(); i++) {
// 4
arr[integers.get(i)] = list.get(i);
}
});
}finally {
clearNew();
}
return Lists.newArrayList(arr);
}
public static JedisPipeline newInstance(JedisCluster jedisCluster) {
ClusterPipelineInvocationHandler handler = new ClusterPipelineInvocationHandler();
return handler.getProxy(jedisCluster);
}
public static class ClusterPipelineInvocationHandler implements InvocationHandler {
private ClusterPipelineUtil clusterPipeline;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (args == null || args.length == 0) {
return method.invoke(clusterPipeline, args);
}
String key = (String) args[0];
Pipeline before = clusterPipeline.before(key);
return method.invoke(before, args);
}
JedisPipeline getProxy(JedisCluster jedisCluster) {
clusterPipeline = new ClusterPipelineUtil(jedisCluster);
return (JedisPipeline) Proxy.newProxyInstance(clusterPipeline.getClass().getClassLoader(), clusterPipeline.getClass().getInterfaces(), this);
}
}
}
测试类
// 省略jedisCluster构建
@Before
public void before(){
jedisCluster.del("a");
jedisCluster.del("b");
jedisCluster.del("c");
jedisCluster.del("d");
jedisCluster.sadd("a","1","2","3");
jedisCluster.sadd("b","4");
jedisCluster.set("c","5");
jedisCluster.set("d","6");
}
@Test
public void test() {
JedisPipeline pipelined = ClusterPipelineUtil.newInstance(jedisCluster);
pipelined.sismember("a", "1");
pipelined.sismember("a", "2");
pipelined.sismember("a", "3");
pipelined.sismember("a", "4");
pipelined.sismember("b", "4");
pipelined.sismember("b", "5");
pipelined.get("c");
pipelined.get("d");
List<Object> resultList = pipelined.syncAndReturnAll();
resultList.forEach(System.out::println);
}