Redis系列之(六)——集群redis cluster
一、集群方案与分区
1、一致性hash分区
一致性哈希分区(Distributed Hash Table)实现思路是为系统中每个节点分配一个token,范围一般在0~232,这些token构成一个哈希环。数据读写执行节点查找操作时,先根据key计算hash值,然后顺时针找到第一个大于等于该哈希值的token节点。
这种方式相比节点取余最大的好处在于加入和删除节点只影响哈希环中相邻的节点,对其他节点无影响。
为了保证数据和负载的均衡,通过虚拟槽分区,巧妙地使用了哈希空间,使用分散度良好的哈希函数把所有数据映射到一个固定范围的整数集合中,整数定义为槽(slot)。
redis cluster slot
1、客户端分片
把分片的逻辑放在Redis客户端实现,通过Redis客户端预先定义好的路由规则,把对Key的访问转发到不同的Redis实例中,最后把返回结果汇集。
如ShardedJedisPool。
http://shift-alt-ctrl.iteye.com/blog/1885959
#node构建过程(redis.clients.util.Sharded):
//shards列表为客户端提供了所有redis-server配置信息,包括:ip,port,weight,name
//其中weight为权重,将直接决定“虚拟节点”的“比例”(密度),权重越高,在存储是被hash命中的概率越高
//--其上存储的数据越多。
//其中name为“节点名称”,jedis使用name作为“节点hash值”的一个计算参数。
//---
//一致性hash算法,要求每个“虚拟节点”必须具备“hash值”,每个实际的server可以有多个“虚拟节点”(API级别)
//其中虚拟节点的个数= “逻辑区间长度” * weight,每个server的“虚拟节点”将会以“hash”的方式分布在全局区域中
//全局区域总长为2^32.每个“虚拟节点”以hash值的方式映射在全局区域中。
// 环形:0-->vnode1(:1230)-->vnode2(:2800)-->vnode3(400000)---2^32-->0
//所有的“虚拟节点”将按照其”节点hash“顺序排列(正序/反序均可),因此相邻两个“虚拟节点”之间必有hash值差,
//那么此差值,即为前一个(或者后一个,根据实现而定)“虚拟节点”所负载的数据hash值区间。
//比如hash值为“2000”的数据将会被vnode1所接受。
//---
private void initialize(List<S> shards) {
nodes = new TreeMap<Long, S>();//虚拟节点,采取TreeMap存储:排序,二叉树
for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
if (shardInfo.getName() == null)
//当没有设置“name”是,将“SHARD-NODE”作为“虚拟节点”hash值计算的参数
//"逻辑区间步长"为160,为什么呢??
//最终多个server的“虚拟节点”将会交错布局,不一定非常均匀。
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
else
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
}
resources.put(shardInfo, shardInfo.createResource());
}
}
#node选择方式:
public R getShard(String key) {
return resources.get(getShardInfo(key));
}
//here:
public S getShardInfo(byte[] key) {
//获取>=key的“虚拟节点”的列表
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
//如果不存在“虚拟节点”,则将返回首节点。
if (tail.size() == 0) {
return nodes.get(nodes.firstKey());
}
//如果存在,则返回符合(>=key)条件的“虚拟节点”的第一个节点
return tail.get(tail.firstKey());
}
2、Twemproxy
image.pngTwemproxy是由Twitter开源的Redis代理,其基本原理是:Redis客户端把请求发送到Twemproxy,Twemproxy根据路由规则发送到正确的Redis实例,最后Twemproxy把结果汇集返回给客户端。(Twemproxy通过lvs做负载均衡及高可用)
Twemproxy通过引入一个代理层,将多个Redis实例进行统一管理,使Redis客户端只需要在Twemproxy上进行操作,而不需要关心后面有多少个Redis实例,从而实现了Redis集群。
缺点:由于Redis客户端的每个请求都经过Twemproxy代理才能到达Redis服务器,这个过程中会产生性能损失。最大的问题,Twemproxy无法平滑地增加Redis实例(可以做到自动剔除)。
3、codis
codisCodis Proxy:Redis客户端连接到Redis实例的代理,实现了Redis的协议,Redis客户端连接到Codis Proxy进行各种操作。Codis Proxy是无状态的,可以用Keepalived等负载均衡软件部署多个Codis Proxy实现高可用。
CodisRedis:Codis项目维护的Redis分支,添加了slot和原子的数据迁移命令。Codis上层的 Codis Proxy和Codisconfig只有与这个版本的Redis通信才能正常运行。
Codisconfig:Codis管理工具。可以执行添加删除CodisRedis节点、添加删除Codis Proxy、数据迁移等操作。另外,Codisconfig自带了HTTP server,里面集成了一个管理界面,方便运维人员观察Codis集群的状态和进行相关的操作,极大提高了运维的方便性,弥补了Twemproxy的缺点。
ZooKeeper:Codis依赖于ZooKeeper存储数据路由表的信息和Codis Proxy节点的元信息。另外,Codisconfig发起的命令都会通过ZooKeeper同步到CodisProxy的节点。
Codis最大的优势在于支持平滑增加(减少)Redis Server Group(Redis实例),能安全、透明地迁移数据,这也是Codis 有别于Twemproxy等静态分布式 Redis 解决方案的地方。Codis增加了Redis Server Group后,就牵涉到slot的迁移问题。
4、redis cluster
redis cluster一个Redis实例具备了“数据存储”和“路由重定向”,完全去中心化的设计。这带来的好处是部署非常简单,直接部署Redis就行,不像Codis有那么多的组件和依赖。
但需要客户端支持,如果对协议进行了较大的修改,对应的Redis客户端也需要升级。造成Redis 3.0集群在业界并没有被大规模使用。
5、redis cluster使用注意
1)key批量操作支持有限。如mset、mget,目前只支持具有相同slot值的key执行批量操作。对于映射为不同slot值的key由于执行mget、mget等操作可能存在于多个节点上因此不被支持。
2)key事务操作支持有限。同理只支持多key在同一节点上的事务操作,当多个key分布在不同的节点上时无法使用事务功能。
3)key作为数据分区的最小粒度,因此不能将一个大的键值对象如hash、list等映射到不同的节点。
4)不支持多数据库空间。单机下的Redis可以支持16个数据库,集群模式下只能使用一个数据库空间,即db0。
5)复制结构只支持一层,从节点只能复制主节点,不支持嵌套树状复制结构。
二、redis cluster搭建与节点通讯
1、redis cluster搭建
1)准备节点
#节点端口
port 6379
# 开启集群模式
cluster-enabled yes
# 节点超时时间,单位毫秒
cluster-node-timeout 15000
# 集群内部配置文件
cluster-config-file "nodes-6379.conf"
当集群内节点信息发生变化,如添加节点、节点下线、故障转移等。节点会自动保存集群状态到配置文件中。需要注意的是,Redis自动维护集群配置文件,不要手动修改,防止节点重启时产生集群信息错乱。
#cat data/nodes-6379.conf
cfb28ef1deee4e0fa78da86abe5d24566744411e 127.0.0.1:6379 //节点ID
myself,master - 0 0 0 connected
vars currentEpoch 0 lastVoteEpoch 0
节点ID不同于运行ID。节点ID在集群初始化时只创建一次,节点重启时会加载集群配置文件进行重用,而Redis的运行ID每次重启都会变化。
2)节点握手
节点握手是指一批运行在集群模式下的节点通过Gossip协议彼此通信,达到感知对方的过程。
节点握手
只需要在集群内任意节点上执行cluster meet命令加入新节点,握手状态会通过消息在集群内传播,这样其他节点会自动发现新节点并发起握手流程。
3)分配槽
redis-cli -h 127.0.0.1 -p 6379 cluster addslots {0...5461}
redis-trib.rb是采用Ruby实现的Redis集群管理工具。内部通过Cluster相关命令帮我们简化集群创建、检查、槽迁移和均衡等常见运维操作。
三、请求路由与客户端
Redis集群对客户端通信协议做了比较大的修改,为了追求性能最大化,并没有采用代理的方式而是采用客户端直连节点的方式。
在集群模式下,Redis接收任何键相关命令时首先计算键对应的槽,再根据槽找出所对应的节点,如果节点是自身,则处理键命令;否则回复MOVED重定向错误,通知客户端请求正确的节点。这个过程称为MOVED重定向。
MOVED重定向
#计算槽节点
def key_hash_slot(key):
int keylen = key.length();
for (s = 0; s < keylen; s++){
if (key[s] == '{'){
break;
}
}
if (s == keylen) return crc16(key,keylen) & 16383;
for (e = s+1; e < keylen; e++):
if (key[e] == '}') break;
if (e == keylen || e == s+1) return crc16(key,keylen) & 16383;
return crc16(key+s+1,e-s-1) & 16383; /* 使用{和}之间的有效部分计算槽,{hash_tag} */
#查找槽节点
def execute_or_redirect(key):
int slot = key_hash_slot(key);
ClusterNode node = slots[slot];
if(node == clusterState.myself):
return executeCommand(key);
else:
return '(error) MOVED {slot} {node.ip}:{node.port}';
mget等命令优化批量调用时,键列表必须具有相同的slot,否则会报错。这时可以利用hash_tag让不同的键具有相同的slot达到优化的目的。
Pipeline同样可以受益于hash_tag,由于Pipeline只能向一个节点批量发送执行命令,而相同slot必然会对应到唯一的节点,降低了集群使用Pipeline的门槛。
Jedis客户端命令执行流程
1)计算slot并根据slots缓存获取目标节点连接,发送命令。
2)如果出现连接错误,使用随机连接重新执行键命令,每次命令重试对redi-rections参数减1。
3)捕获到MOVED重定向错误,使用cluster slots命令更新slots缓存(renewSlotCache方法)。
4)重复执行1)~3)步,直到命令执行成功,或者当redi-rections<=0时抛出JedisClusterMaxRedirectionsException异常。
image.png
image.png
#JedisClusterCommand的runWithRetries方法(jedis2.8.1)
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
return execute(connection);
} catch (JedisConnectionException jce) {
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}
// release current connection before recursion
releaseConnection(connection);
connection = null;
// retry with random connection
return runWithRetries(key, redirections - 1, true, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
// release current connection before recursion or renewing
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
} else {
throw new JedisClusterException(jre);
}
return runWithRetries(key, redirections - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
#renewSlotCache
public void renewSlotCache(Jedis jedis) {
try {
cache.discoverClusterSlots(jedis);
} catch (JedisConnectionException e) {
renewSlotCache();
}
}
#discoverClusterSlots
public void discoverClusterSlots(Jedis jedis) {
w.lock();
try {
this.slots.clear();
List<Object> slots = jedis.clusterSlots();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= 2) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
if (hostInfos.size() <= 0) {
continue;
}
// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);
setNodeIfNotExist(targetNode);
assignSlotsToNode(slotNums, targetNode);
}
} finally {
w.unlock();
}
}
#assignSlotsToNode
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}
四、故障转移与集群运维
主观下线
1)节点a发送ping消息给节点b,如果通信正常将接收到pong消息,节点a更新最近一次与节点b的通信时间。
2)如果节点a与节点b通信出现问题则断开连接,下次会进行重连。如果一直通信失败,则节点a记录的与节点b最后通信时间将无法更新。
3)节点a内的定时任务检测到与节点b最后通信时间超高cluster-node-timeout时,更新本地对节点b的状态为主观下线(pfail)。
客观下线
1)当消息体内含有其他节点的pfail状态会判断发送节点的状态,如果发送节点是主节点则对报告的pfail状态处理,从节点则忽略。
2)找到pfail对应的节点结构,更新clusterNode内部下线报告链表。
3)根据更新后的下线报告链表告尝试进行客观下线。
image.png
尝试客观下线
1)首先统计有效的下线报告数量,如果小于集群内持有槽的主节点总数的一半则退出。
2)当下线报告大于槽主节点数量一半时,标记对应故障节点为客观下线状态。
3)向集群广播一条fail消息,通知所有的节点将故障节点标记为客观下线,fail消息的消息体只包含故障节点的ID。
image.png
集群中使用管道
http://www.bubuko.com/infodetail-1106789.html
https://blog.csdn.net/youaremoon/article/details/51751991