Redis——JedisCluster
2018-07-17 本文已影响0人
黄金矿工00七
- smart客户端
- 实现原理(追求性能,不使用代理)
- 从集群中选一个可运行节点,使用cluster slots初始化槽和节点映射。
- 将cluster slots的结果映射到本地,为每个节点创建JedisPool。
-
执行命令
执行命令
- 实现原理(追求性能,不使用代理)
执行命令的过程简单来说,就是通过CRC16计算出key的槽,根据节点映射直接访问目标节点,如果出错,就随机挑选一个节点,通过moved重定向访问目标节点,并且重新初始化节点映射。
好吧,直接上源码
JedisClusterCommand.java
//命令的执行过程
public T run(String key) {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
}
//这里是真正的执行函数,参数分别为UTF-8编码的key二进制数组,重定向的次数,是否尝试连接随机节点,是否ask重定向
return runWithRetries(SafeEncoder.encode(key), this.redirections, false, false);
}
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
//对尝试连接目标节点的次数做判断,超过预设次数抛出异常
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
//是否ask重定向
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
//是否尝试连接随机节点
connection = connectionHandler.getConnection();
} else {
//计算出key的槽位置,然后从本地缓存中获取目标主机的信息
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
//执行命令
return execute(connection);
} catch (JedisConnectionException jce) {
//连接出错,是否尝试随机节点
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}
// release current connection before recursion释放当前连接
releaseConnection(connection);
connection = null;
//重新尝试连接,redirections -1
// retry with random connection
return runWithRetries(key, redirections - 1, true, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
// release current connection before recursion or renewing
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
} else {
throw new JedisClusterException(jre);
}
return runWithRetries(key, redirections - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
JedisClusterConnectionHandler:连接持有者,实际上Handler内部维护了一个JedisClusterInfoCache ,也就是节点和槽信息映射,通过这些信息来获取连接池,换句话说,内置了所有节点的连接池
JedisClusterInfoCache .java
//集群节点信息转换器
public static final ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
//节点--连接池映射 每个节点都分配了一个连接池
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
//槽--连接池映射 每个槽也分配了一个连接池
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
//通过读写锁来分离对两个映射Map的访问,保证了集群信息的正确性
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
自己动手写一个客户端连接工具测试一下,有个小bug,使用jedis2.8的时候会报host转换的异常,所以使用了2.9:
public final class ClusterUtil {
private ClusterUtil() {
}
public static JedisCluster getJedisCluster() {
return RedisClusterPoolHolder.getInstance();
}
private static final class RedisClusterPoolHolder {
//使用pool单例
private static final ClusterPool CLUSTER_POOL = new ClusterPool();
private RedisClusterPoolHolder() {
}
private static JedisCluster getInstance() {
return CLUSTER_POOL.getJedisCluster();
}
}
private static class ClusterPool {
/**
* redis-Cluster节点地址
*/
private static final HostAndPort CLUSTER_NODE_1 = new HostAndPort("120..151.31", 6379);
private static final HostAndPort CLUSTER_NODE_2 = new HostAndPort("120..151.31", 6380);
private static final HostAndPort CLUSTER_NODE_3 = new HostAndPort("120..151.31", 6381);
private static final HostAndPort CLUSTER_NODE_4 = new HostAndPort("122..201.233", 6379);
private static final HostAndPort CLUSTER_NODE_5 = new HostAndPort("122..233", 6380);
private static final HostAndPort CLUSTER_NODE_6 = new HostAndPort("122..201.233", 6381);
//Cluster节点地址集合
private static final Set<HostAndPort> NODES = new HashSet<HostAndPort>() {
{
add(CLUSTER_NODE_1);
add(CLUSTER_NODE_2);
add(CLUSTER_NODE_3);
add(CLUSTER_NODE_4);
add(CLUSTER_NODE_5);
add(CLUSTER_NODE_6);
}
};
/**
* 访问密码
*/
private static final String AUTH = "woshishei";
private static final String HOST = "120.79.151.31";
/**
* 可用连接实例的最大数目,默认值为8; 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
*/
private static final int MAX_ACTIVE = 1024;
/**
* 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
*/
private static final int MAX_IDLE = 200;
/**
* 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
*/
private static final int MAX_WAIT = 10000;
private static final int TIMEOUT = 10000;
/**
* 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
*/
private static final boolean TEST_ON_BORROW = true;
/**
* JedisCluster
*/
private static JedisCluster JEDIS_CLUSTER = null;
ClusterPool() {
/**
* 初始化Redis-Cluster连接池.
*/
try {
// maxActive ==> maxTotal
// maxWait ==> maxWaitMillisl
/*
* 配置JedisPool*/
JedisPoolConfig CONFIG = new JedisPoolConfig();
CONFIG.setMaxTotal(MAX_ACTIVE);
CONFIG.setMaxIdle(MAX_IDLE);
CONFIG.setMaxWaitMillis(MAX_WAIT);
CONFIG.setTestOnBorrow(TEST_ON_BORROW);
JEDIS_CLUSTER = new JedisCluster(NODES, TIMEOUT, CONFIG);
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
}
}
private JedisCluster getJedisCluster() {
return JEDIS_CLUSTER;
}
}
}