MGET

2022-11-21  本文已影响0人  彳亍口巴

MGET源码

public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
        //获取分区slot和key的映射关系
        Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
 
        //如果分区数小于2也就是只有一个分区即所有key都落在一个分区就直接获取
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
 
        //每个key与slot映射关系
        Map<K, Integer> slots = SlotHash.getSlots(partitioned);
        Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
 
        //遍历分片信息,逐个发送
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
            executions.put(entry.getKey(), mget);
        }
 
        // restore order of key 恢复key的顺序
        return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
            List<KeyValue<K, V>> result = new ArrayList<>();
            for (K opKey : keys) {
                int slot = slots.get(opKey);
 
                int position = partitioned.get(slot).indexOf(opKey);
                RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
            }
 
            return result;
        });
}

整个mget操作其实分为了以下几步:

1、 获取分区slot和key的映射关系,遍历出所需key对应的每个分区slot。

2、 判定,slot个数是不是小于2,也就是是否所有的key都在同一分区,如果是,发起一次mget命令,直接获取。

3、 如果分区数量大于2,keys对应多个分区,那么遍历所有分区,分别向redis发起mget请求获取数据。

4、 等待所有请求执行结束,重新组装结果数据,保持跟入参key的顺序一致,然后返回结果。

可以看到,当使用mget方法获取多个key,并且这些key还存在于不同的slot分区中,那么一次mget操作其实会对redis发起多次mget命令的请求,有多少个slot,就发起多少次,然后在所有请求执行完毕之后,整个mget方法才会能够继续执行下去。看似一次mget方法调用,其实底层对应的是多次redis调用和多次io交互。

解决办法

方案1 - hashtag
hashtag 强制将key放在一个redis node上。这个方案,相当于将redis集群退化成了单机redis,系统的高可用,容灾能力就大打折扣了,只能尝试使用主从,哨兵等其他分布式架构来缓减,但是,既然选择了集群,肯定集群模式是相比于其他模式是最符合当前系统架构现状的,使用这种方案,可能会引发更大的问题。不推荐。

方案2 -并发调用
我们从上面的代码中可以看到,for循环内多次串行的redis调用,是导致执行rt上涨的原因,那么,自然而然可以想到,是否可以用并行替代底层串行的逻辑。也就是将mget中的keys,根据slot分片规则,先groupBy一下,然后用多线程的方式并行执行。
首先,这个方案中,用于并发调用提交redis mget任务的线程池的设计非常重要,各种参数的调校,势必需要非常充分的压测,这本身难度就比较大。其次,我们在日常使用中,一次mget的key基本上在几十到100,相比于redis 16384的固定槽位数量,是数量级上的差距,所以,我们一次请求的这些key,基本上是分布在不同的slot中的,换句话讲,如果按照这么拆分keys,大概率是相当于拆出了等于key数量的get请求。。也就丧失了mget的意义。

实践

func test16() {
    _, err := redisCli.Do("SET", "aa", "wd")
    if err != nil {
        fmt.Println("redis set error:", err)
        return
    }
    var keys []interface{}
    keys = append(keys, "dd", "aa")
    redisData, err := redis.Strings(redisCli.Do("MGET", keys...))
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(len(redisData))
    for _, info := range redisData {
        if info == "" {
            fmt.Println("空字符串")
            continue
        }
        fmt.Printf("info:%+v\n", info)
    }
}

1、set aa=wd
2、mget dd和aa
3、打印结果,发现结果数组和key的个数一致,也就是每个key的结果一定会返回

会不会有部分成功,部分失败的情况?
func Strings(reply interface{}, err error) ([]string, error) {
    var result []string
    err = sliceHelper(reply, err, "Strings", func(n int) { result = make([]string, n) }, func(i int, v interface{}) error {
        switch v := v.(type) {
        case string:
            result[i] = v
            return nil
        case []byte:
            result[i] = string(v)
            return nil
        default:
            return fmt.Errorf("redigo: unexpected element type for Strings, got type %T", v)
        }
    })
    return result, err
}

将所有返回结果转成字符串数组,如果redis返回的结果为空,则数组中默认为空字符串,若其中一个失败,则整个函数返回失败,可认为是全部成功或者全部失败

上一篇下一篇

猜你喜欢

热点阅读