JedisCluster 操作管道(基于jedis 2.9+的

2020-04-22  本文已影响0人  失足的骏马

前言

现在很多的博客论坛,很多都是以前写的代码。殊不知,这代码不是一层不变的。特别是涉及到源码的改变。这就导致很多网上的文章几乎都是 copy 来 copy 去的。这里也只是建议大家的有看源码的习惯。不然,照抄网上的博客有时候真的不能解决问题。还得动动脑子。本人也是踩坑过来的 。好了。回到重点,

为什么  JedisCluster 不支持直接操作管道(Pipeline)?  (如果面试这么问。你怎么回答?百思不得其姐(解)  欢迎留言^_^)

首先我们看下 JedisCluster  源码。

public class JedisCluster extends BinaryJedisCluster implements JedisCommands,

    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {

/*****************一堆方法*****************/

  }

看到这个 JedisCluster 类 继承 BinaryJedisCluster。 好了我们在下一步看   BinaryJedisCluster 类里面到底是什么?

public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,

    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {

  public static final short HASHSLOTS = 16384;

  protected static final int DEFAULT_TIMEOUT = 2000;

  protected static final int DEFAULT_MAX_REDIRECTIONS = 5;

  protected int maxAttempts;

  protected JedisClusterConnectionHandler connectionHandler;

/****************************一堆方法*****************************/

}

可以看到在 BinaryJedisCluster  继承一些接口。所以我我们先看下这个类下除了构造方法还剩下什么东东?

埃!!! JedisClusterConnectionHandler connectionHandler; 这个类里面会不会有我们想要的东西呢?进去看下

package redis.clients.jedis;

import java.io.Closeable;

import java.util.Map;

import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.exceptions.JedisConnectionException;

public abstract class JedisClusterConnectionHandler implements Closeable {

    protected final JedisClusterInfoCache cache;

  public JedisClusterConnectionHandler(Set<HostAndPort> nodes,

                                      final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {

    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);

    initializeSlotsCache(nodes, poolConfig, password);

  }

  abstract Jedis getConnection();

  abstract Jedis getConnectionFromSlot(int slot);

  public Jedis getConnectionFromNode(HostAndPort node) {

    return cache.setupNodeIfNotExist(node).getResource();

  }

  public Map<String, JedisPool> getNodes() {

    return cache.getNodes();

  }

  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {

    for (HostAndPort hostAndPort : startNodes) {

      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());

      if (password != null) {

        jedis.auth(password);

      }

      try {

        cache.discoverClusterNodesAndSlots(jedis);

        break;

      } catch (JedisConnectionException e) {

        // try next nodes

      } finally {

        if (jedis != null) {

          jedis.close();

        }

      }

    }

  }

  public void renewSlotCache() {

    cache.renewClusterSlots(null);

  }

  public void renewSlotCache(Jedis jedis) {

    cache.renewClusterSlots(jedis);

  }

  @Override

  public void close() {

    cache.reset();

  }

}

这是个抽象类。里面有2个抽象方法。在2.9以前版本 

  abstract Jedis getConnection();

  abstract Jedis getConnectionFromSlot(int slot);

这2方法可有所实现。(没去看2.9以前的版本源码)

网上很多以前博客的都是使用  getConnectionFromSlot(int slot); 来获取某个 jedis  来操作  pipeline。

所以在此我们还能怎么办呢?

这会儿。我们看到

JedisClusterInfoCache cache;       这里面会不会有有我们想要的?

public class JedisClusterInfoCache {

  private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();

  private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  private final Lock r = rwl.readLock();

  private final Lock w = rwl.writeLock();

  private volatile boolean rediscovering;

  private final GenericObjectPoolConfig poolConfig;

  private int connectionTimeout;

  private int soTimeout;

  private String password;

  private static final int MASTER_NODE_INDEX = 2;

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {

    this(poolConfig, timeout, timeout, null);

  }

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,

      final int connectionTimeout, final int soTimeout, final String password) {

    this.poolConfig = poolConfig;

    this.connectionTimeout = connectionTimeout;

    this.soTimeout = soTimeout;

    this.password = password;

  }

/******方法******以下方法是我重点标注的*************/

public JedisPool getNode(String nodeKey) {

    r.lock();

    try {

      return nodes.get(nodeKey);

    } finally {

      r.unlock();

    }

  }

  public JedisPool getSlotPool(int slot) {

    r.lock();

    try {

      return slots.get(slot);

    } finally {

      r.unlock();

    }

  }

  public Map<String, JedisPool> getNodes() {

    r.lock();

    try {

      return new HashMap<String, JedisPool>(nodes);

    } finally {

      r.unlock();

    }

  }

  public List<JedisPool> getShuffledNodesPool() {

    r.lock();

    try {

      List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());

      Collections.shuffle(pools);

      return pools;

    } finally {

      r.unlock();

    }

  }

埃~看到了可以获取到某个jedis  

其实这个 JedisClusterInfoCache 类 是你在初始化jedisCluster时 将所有的节点放入缓存。

因此,这个类的方法能给我们返回相关的jedis实例

我们要这么做呢?

接下来是我的代码。通过java的反射机制直接获取。

public static void main(String[] args) throws NoSuchFieldException {

        JedisPoolConfig config = new JedisPoolConfig();

        Set<HostAndPort> nodeList = new HashSet<>();

        nodeList.add(new HostAndPort("192.168.41.65", 6379));

        nodeList.add(new HostAndPort("192.168.41.70", 6379));

        nodeList.add(new HostAndPort("192.168.41.42", 6379));

        nodeList.add(new HostAndPort("192.168.41.20", 6380));

        nodeList.add(new HostAndPort("192.168.41.30", 6380));

        nodeList.add(new HostAndPort("192.168.41.40", 6380));

        JedisCluster jedisCluster = new JedisCluster(nodeList, 3000, config);

        jedisCluster.set("James", "Bond");

        //通過 java.lang.reflect.Field 反射

        Jedis jedis = getJedisFieldBySlot(jedisCluster, 0, "James");

        //通過spring 工具類  ReflectionUtils 反射

        Jedis j = getJedisBySlot(jedisCluster, 0, "James");

// 接下来就是pipeline操作了

if(jedis != null) {

Pipeline pipeline = jedis.pipelined();

pipeline.syncAndReturnAll();

// jedis会自动将资源归还到连接池

jedis.close();

}else {

System.err.println("找不到 jedis");

}

    }

/**

* 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例

* @param jedisCluster

* @param slot

* @param key

* @return Jedis

*/

public static Jedis getJedisFieldBySlot(JedisCluster jedisCluster,int slot,String key) {

try {

if(key !=null) {

// 获取key对应的slot 获取槽号(0~16383)

slot = JedisClusterCRC16.getSlot(key);

}

Field field = BinaryJedisCluster.class.getDeclaredField("connectionHandler");

field.setAccessible(true);

JedisClusterConnectionHandler connectionHandler =  (JedisClusterConnectionHandler) field.get(jedisCluster);

Field jedisclusterinfocache =  JedisClusterConnectionHandler.class.getDeclaredField("cache");

jedisclusterinfocache.setAccessible(true);

JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

JedisPool pool = cache.getSlotPool(slot);

Jedis jedis = pool.getResource();

return jedis;

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

/**

* 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例

* @param jedisCluster

* @param slot

* @param key

* @return Jedis

*/

public static Jedis getJedisBySlot(JedisCluster jedisCluster,int slot,String key) {

try {

if(key !=null) {

slot = JedisClusterCRC16.getSlot(key);

}

//org.springframework.util.ReflectionUtils 工具類  BinaryJedisCluster 下的  JedisClusterConnectionHandler

Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);

field.setAccessible(true);

JedisClusterConnectionHandler connectionHandler =  (JedisClusterConnectionHandler) field.get(jedisCluster);

Field jedisclusterinfocache = ReflectionUtils.findField(JedisClusterConnectionHandler.class, null, JedisClusterInfoCache.class);

jedisclusterinfocache.setAccessible(true);

JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

JedisPool pool = cache.getSlotPool(slot);

Jedis jedis = pool.getResource();

return jedis;

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

但是大家以为以上的代码就很完美了么?

确实。在去找缓存里的jedis时,可能某个节点挂了,然后刚好程序拿到这个实例,

这时候这里就会出现错误。因此我们 应该在原来的基础上,去刷新一遍集群。

代码由你们来给吧。我不会写了。哈哈哈·

最后 如果针对  JedisClusterInfoCache 源码分析的 请看   https://www.cnblogs.com/zhengzuozhanglina/p/11383035.html

上一篇下一篇

猜你喜欢

热点阅读