Redis 一致性Hash客户端实现
2017-06-28 本文已影响0人
Cajesse佳泽
Redis 一致性Hash客户端实现
Redis 分区有一种实现方案就是使用支持一致性Hash的客户端
Java好像没有这类型的客户端,于是我想自己实现一遍。
原理
一致性hash的原理,我简述一下,具体可以看这里
- 首先我们将我们的N个实例通过 hash() 散落在一个周长为 2^32 -1 长度的环上
我的做法是直接为每个实例生成一个随机字符串作为这个实例的编号,然后获取该编号的hashCode() 作为在环上的位置。 假设我们N个实例的N个位置为[P1,P2,...,Pn] - 每次set一个(key,value)的时候,
- 将key 通过 hash() 同样得到一个对应到环上位置P
- 寻找P的后继实例Px
- 将值set进 Px实例
- 每次get一个key的时候,
- 将key 通过 hash() 得到一个对应到环上的位置P
- 寻找P的后继实例Px
- 在Px实例获取值
- 增加实例的时候
- 参照第1点得到一个位置Pm
- 寻找P的后继实例Px
- get Px实例上的所有数据,清空Px,对这些数据进行重新set
- 删除实例的时候
- 将要删除的实例的所有数据填入该实例的后继实Px例中。
- 实例宕机的时候
- 在没有开启复制功能的时候该实例的数据将会丢失,但是整个集群是可用的。
- 如果开启复制功能,假设复制因子为2的时候,每次set会把数据写入两个实例中,即把数据写入当前命中的实例Px的时候,同时会写入Px的后继实例Px+1.这样能够容忍两个不连续节点宕机的情况。但是同时写入效率会降低
定义 后继实例:在[P1,P2,...,Pn]寻找在所有比P大于等于的集合中最小的那个 Px,若P已经是最大,则 Px=min{P1,P2,...,Pn}
低配版代码(没有复制功能)
public class ConsistentHashClient {
private TreeMap<Integer, Node> mapped;
public ConsistentHashClient(String... args) {
mapped = new TreeMap<>();
//初始化一开始的客户端
for (String host : args) {
String hostName = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
Node node = new Node(hostName, port);
int index = mapToNode(node);
mapped.put(index, node);
}
}
public void set(String key, String value) {
Node redis = getTrulyNode(key);
log.info("set key = {} , value = {} into {}",key,value,redis.hostName+":"+redis.port);
redis.set(key, value);
}
public String get(String key) {
Node redis = getTrulyNode(key);
String value = redis.get(key);
log.info("get key = {} , value = {} get from {}",key, value, redis.hostName + ":" + redis.port);
return value;
}
public int addNode(Node newNode) {
log.info("start to addNode, hostName = {} , port = {} ",newNode.hostName,newNode.port);
int index = mapToNode(newNode);
if (mapped.get(index) != null) {
throw new IllegalArgumentException("对应的位置已经有redis实例对应了");
} else {
mapped.put(index, newNode);
}
Node next = getNextNode(index);
if (next != null) {
Map<String, String> cache = next.keys();
next.empty();
//数据迁移
for (Map.Entry<String, String> entry : cache.entrySet()) {
set(entry.getKey(), entry.getValue());
}
}
log.info("finish to addNode, hostName = {} , port = {} ",newNode.hostName,newNode.port);
return index;
}
public Node deleteNode(String hostName, int port) {
log.info("start to delete node, hostName = {},port = {}",hostName,port);
for (Map.Entry<Integer, Node> node : mapped.entrySet()) {
if (hostName.equals(node.getValue().hostName) && port == node.getValue().port) {
return deleteNode(node.getKey());
}
}
throw new IllegalArgumentException("要删除的Ip和端口不存在");
}
private Node getNextNode(int index) {
Map.Entry<Integer, Node> next = mapped.higherEntry(index);
if (next == null) {
next = mapped.higherEntry(Integer.MIN_VALUE);
}
if (next == null) throw new IllegalStateException("客户端没有对应的Redis实例");
return next.getValue();
}
private String info(){
StringBuilder sb = new StringBuilder("\r\n**********************************\r\n");
for(Map.Entry<Integer,Node> entry : mapped.entrySet()) {
sb.append(entry.getValue().info()).append("**********************************\r\n");
}
return sb.toString();
}
private Node deleteNode(int index) {
Node cur = mapped.get(index);
if (cur != null) {
Node next = getNextNode(index);
if (next != null) {
for (Map.Entry<String, String> entry : cur.keys().entrySet()) {
next.set(entry.getKey(), entry.getValue());
}
} else {
log.info("data all lost");
}
mapped.remove(index);
log.info("start to delete node, hostName = {},port = {}",cur.hostName,cur.port);
}
return cur;
}
private Node getTrulyNode(String key) {
int index = mapToNode(key);
return getNextNode(index - 1);
}
private int mapToNode(Object key) {
return key.hashCode();
}
private static class Node {
Node(String hostName, int port) {
this.hostName = hostName;
this.port = port;
cache = new Jedis(hostName, port);
this.no = UUID.randomUUID().toString();
}
private String hostName = "localhost";
private int port = 6379;
private String no;
private Jedis cache;
public void set(String key, String value) {
cache.set(key, value);
}
public String get(String key) {
return cache.get(key);
}
@Override
public int hashCode() {
return no.hashCode();
}
public Map<String, String> keys() {
Map<String, String> all = new HashMap<>();
Set<String> keySet = cache.keys("*");
for (String key : keySet) {
all.put(key, cache.get(key));
}
return all;
}
public void empty() {
cache.flushDB();
}
public String info(){
return cache.info();
}
}
private static final String TEST_HOST = "172.18.100.75";
public static void main(String[] args) {
ConsistentHashClient client = new ConsistentHashClient(TEST_HOST+":6381",TEST_HOST + ":6379",TEST_HOST+":6380");
List<String> keys = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
String key = UUID.randomUUID().toString();
keys.add(key);
client.set(key, key);
}
for (int i = 0; i < 1000; i++) {
if (!client.get(keys.get(i)).equals(keys.get(i))) throw new IllegalStateException("数据不一致");
}
log.info("客户端连接了{}个客户端,每个客户端信息如下:{}",client.mapped.size(),client.info());
}
}
总结
- 以前觉得一致性hash算法很万能,但是今天测试了之后,发现把节点均匀散落在环上面是不太可能,毕竟实例的个数相对 2^32 确实太小了, 目前还没有想到更好地办法解决这个问题,这个问题会不会就是RedisCluster不使用一致性算法的原因?
- 参照 Kafka 的
replication-factor
我自己想出了这个复制功能,目前还没实现,因为第一点让我没了冲动,大家有好的方法可以提供给我。 823382133@qq.com