JedisCluster 操作管道(基于jedis 2.9+的
前言
现在很多的博客论坛,很多都是以前写的代码。殊不知,这代码不是一层不变的。特别是涉及到源码的改变。这就导致很多网上的文章几乎都是 copy 来 copy 去的。这里也只是建议大家的有看源码的习惯。不然,照抄网上的博客有时候真的不能解决问题。还得动动脑子。本人也是踩坑过来的 。好了。回到重点,
为什么 JedisCluster 不支持直接操作管道(Pipeline)? (如果面试这么问。你怎么回答?百思不得其姐(解) 欢迎留言^_^)
首先我们看下 JedisCluster 源码。
public class JedisCluster extends BinaryJedisCluster implements JedisCommands,
MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
/*****************一堆方法*****************/
}
看到这个 JedisCluster 类 继承 BinaryJedisCluster。 好了我们在下一步看 BinaryJedisCluster 类里面到底是什么?
public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,
MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
public static final short HASHSLOTS = 16384;
protected static final int DEFAULT_TIMEOUT = 2000;
protected static final int DEFAULT_MAX_REDIRECTIONS = 5;
protected int maxAttempts;
protected JedisClusterConnectionHandler connectionHandler;
/****************************一堆方法*****************************/
}
可以看到在 BinaryJedisCluster 继承一些接口。所以我我们先看下这个类下除了构造方法还剩下什么东东?
埃!!! JedisClusterConnectionHandler connectionHandler; 这个类里面会不会有我们想要的东西呢?进去看下
package redis.clients.jedis;
import java.io.Closeable;
import java.util.Map;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.exceptions.JedisConnectionException;
public abstract class JedisClusterConnectionHandler implements Closeable {
protected final JedisClusterInfoCache cache;
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
initializeSlotsCache(nodes, poolConfig, password);
}
abstract Jedis getConnection();
abstract Jedis getConnectionFromSlot(int slot);
public Jedis getConnectionFromNode(HostAndPort node) {
return cache.setupNodeIfNotExist(node).getResource();
}
public Map<String, JedisPool> getNodes() {
return cache.getNodes();
}
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
if (password != null) {
jedis.auth(password);
}
try {
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
public void renewSlotCache() {
cache.renewClusterSlots(null);
}
public void renewSlotCache(Jedis jedis) {
cache.renewClusterSlots(jedis);
}
@Override
public void close() {
cache.reset();
}
}
这是个抽象类。里面有2个抽象方法。在2.9以前版本
abstract Jedis getConnection();
abstract Jedis getConnectionFromSlot(int slot);
这2方法可有所实现。(没去看2.9以前的版本源码)
网上很多以前博客的都是使用 getConnectionFromSlot(int slot); 来获取某个 jedis 来操作 pipeline。
所以在此我们还能怎么办呢?
这会儿。我们看到
JedisClusterInfoCache cache; 这里面会不会有有我们想要的?
public class JedisClusterInfoCache {
private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
private volatile boolean rediscovering;
private final GenericObjectPoolConfig poolConfig;
private int connectionTimeout;
private int soTimeout;
private String password;
private static final int MASTER_NODE_INDEX = 2;
public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {
this(poolConfig, timeout, timeout, null);
}
public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,
final int connectionTimeout, final int soTimeout, final String password) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
}
/******方法******以下方法是我重点标注的*************/
public JedisPool getNode(String nodeKey) {
r.lock();
try {
return nodes.get(nodeKey);
} finally {
r.unlock();
}
}
public JedisPool getSlotPool(int slot) {
r.lock();
try {
return slots.get(slot);
} finally {
r.unlock();
}
}
public Map<String, JedisPool> getNodes() {
r.lock();
try {
return new HashMap<String, JedisPool>(nodes);
} finally {
r.unlock();
}
}
public List<JedisPool> getShuffledNodesPool() {
r.lock();
try {
List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());
Collections.shuffle(pools);
return pools;
} finally {
r.unlock();
}
}
埃~看到了可以获取到某个jedis
其实这个 JedisClusterInfoCache 类 是你在初始化jedisCluster时 将所有的节点放入缓存。
因此,这个类的方法能给我们返回相关的jedis实例
我们要这么做呢?
接下来是我的代码。通过java的反射机制直接获取。
public static void main(String[] args) throws NoSuchFieldException {
JedisPoolConfig config = new JedisPoolConfig();
Set<HostAndPort> nodeList = new HashSet<>();
nodeList.add(new HostAndPort("192.168.41.65", 6379));
nodeList.add(new HostAndPort("192.168.41.70", 6379));
nodeList.add(new HostAndPort("192.168.41.42", 6379));
nodeList.add(new HostAndPort("192.168.41.20", 6380));
nodeList.add(new HostAndPort("192.168.41.30", 6380));
nodeList.add(new HostAndPort("192.168.41.40", 6380));
JedisCluster jedisCluster = new JedisCluster(nodeList, 3000, config);
jedisCluster.set("James", "Bond");
//通過 java.lang.reflect.Field 反射
Jedis jedis = getJedisFieldBySlot(jedisCluster, 0, "James");
//通過spring 工具類 ReflectionUtils 反射
Jedis j = getJedisBySlot(jedisCluster, 0, "James");
// 接下来就是pipeline操作了
if(jedis != null) {
Pipeline pipeline = jedis.pipelined();
pipeline.syncAndReturnAll();
// jedis会自动将资源归还到连接池
jedis.close();
}else {
System.err.println("找不到 jedis");
}
}
/**
* 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例
* @param jedisCluster
* @param slot
* @param key
* @return Jedis
*/
public static Jedis getJedisFieldBySlot(JedisCluster jedisCluster,int slot,String key) {
try {
if(key !=null) {
// 获取key对应的slot 获取槽号(0~16383)
slot = JedisClusterCRC16.getSlot(key);
}
Field field = BinaryJedisCluster.class.getDeclaredField("connectionHandler");
field.setAccessible(true);
JedisClusterConnectionHandler connectionHandler = (JedisClusterConnectionHandler) field.get(jedisCluster);
Field jedisclusterinfocache = JedisClusterConnectionHandler.class.getDeclaredField("cache");
jedisclusterinfocache.setAccessible(true);
JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);
JedisPool pool = cache.getSlotPool(slot);
Jedis jedis = pool.getResource();
return jedis;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
/**
* 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例
* @param jedisCluster
* @param slot
* @param key
* @return Jedis
*/
public static Jedis getJedisBySlot(JedisCluster jedisCluster,int slot,String key) {
try {
if(key !=null) {
slot = JedisClusterCRC16.getSlot(key);
}
//org.springframework.util.ReflectionUtils 工具類 BinaryJedisCluster 下的 JedisClusterConnectionHandler
Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);
field.setAccessible(true);
JedisClusterConnectionHandler connectionHandler = (JedisClusterConnectionHandler) field.get(jedisCluster);
Field jedisclusterinfocache = ReflectionUtils.findField(JedisClusterConnectionHandler.class, null, JedisClusterInfoCache.class);
jedisclusterinfocache.setAccessible(true);
JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);
JedisPool pool = cache.getSlotPool(slot);
Jedis jedis = pool.getResource();
return jedis;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
但是大家以为以上的代码就很完美了么?
确实。在去找缓存里的jedis时,可能某个节点挂了,然后刚好程序拿到这个实例,
这时候这里就会出现错误。因此我们 应该在原来的基础上,去刷新一遍集群。
代码由你们来给吧。我不会写了。哈哈哈·
最后 如果针对 JedisClusterInfoCache 源码分析的 请看 https://www.cnblogs.com/zhengzuozhanglina/p/11383035.html