7.MIT 6.824 LAB 4A(分布式shard data

2019-05-14  本文已影响0人  西部小笼包

第一步 阅读文档

https://pdos.csail.mit.edu/6.824/labs/lab-shard.html
一直读到4B之前,理解他的每个段落的意思。我是读到第三遍 才全部读清楚。

第二步 整理这个DB的架构思路

lab4 的架构是典型的 M/S 架构(a configuration service and a set of replica groups),不过实现十分基础,很多功能没有实现:1) shards 之间的传递很慢并且不允许 concurrent client acess;2) 每个 raft group 中的 member 不会改变。

configuration service

  1. 由若干 shardmaster 利用 raft 协议保证一致性的集群;
  2. 管理 configurations 的顺序:每个 configuration 描述 replica group 以及每个 group 分别负责存储哪些 shards;
  3. 响应 Join/Leave/Move/Query 请求,并且对 configuration 做出相应的改变;

replica group

  1. 由若干 shardkv 利用 raft 协议保证一致性的集群;
  2. 负责具体数据的存储(一部分),组合所有 group 的数据即为整个 database 的数据;
  3. 响应对应 shards 的 Get/PutAppend 请求,并保证 linearized;
  4. 周期性向 shardmaster 进行 query 获取 configuration,并且进行 migration 和 update;
image.png

Sharemaster 主要负责根据 Client 提供的分区规则,将数据储存在不同的replica group 中

Sharemaster 有多台机器组成,他们之间使用 Raft 协议来保证一致性。

每一个 replica group由多台机器组成,他们之间也是通过 Raft 协议来保证一致性。

第三步 理解代码结构

CLIENT 就不说了,和3A的套路是一样的。
SERVER大多要自己实现。
这边主要要弄清楚的是COMMON


image.png

一个CONFIG 里面包含了这个CONFIG 的版本号。哪个分区归哪个REPLICA GROUP 管
这个REPLICA GROUP 里面包含了哪些SERVER
然后MASTER SERVER 会有一组CONFIG,序号递增。

configs []Config // indexed by config num

下面看下4个API。

JOIN 会给一组GID -> SERVER的映射。其实就是把这些GID 组,加到MASTER的管理范围里来。那么有新的GROUP来了。每台机器可以匀一些SHARD过去

LEAVE 是给一组GID,表示这组GID的SERVER机器们要走。那么他们管的SHARD又要匀给还没走的GROUP

MOVE 是指定某个SHARD 归这个GID管

QUERY就是根据CONFIG NUM来找到对应的CONFIG里的SHARD 规则是如何

其实整个MASTER端的架子和LAB 3A差不多。

与3A不同的核心点,就是REBALANCE,也就是GROUP 加加减减 如何让SHARD 分布的尽可能均匀,同时移动量最小。

第四步 实现CLIENT代码

所有思路都复用3A的代码
也是呼应HINT

Start with a stripped-down copy of your kvraft server.

You should implement duplicate client request detection for RPCs to the shard master. The shardmaster tests don't test this, but the shardkv tests will later use your shardmaster on an unreliable network; you may have trouble passing the shardkv tests if your shardmaster doesn't filter out duplicate RPCs.

Client

package shardmaster

//
// Shardmaster clerk.
//

import (
    "labrpc"
)
import "time"
import "crypto/rand"
import "math/big"

const RetryInterval = time.Duration(100 * time.Millisecond)

type Clerk struct {
    servers    []*labrpc.ClientEnd
    id   int64
    seqNum int
    lastLeader   int
}

func Nrand() int64 {
    max := big.NewInt(int64(1) << 62)
    bigx, _ := rand.Int(rand.Reader, max)
    x := bigx.Int64()
    return x
}


func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
    ck := new(Clerk)
    ck.servers = servers
    ck.id = Nrand()
    ck.seqNum = 0
    ck.lastLeader = 0
    return ck
}

func (ck *Clerk) Query(num int) Config {
    args := QueryArgs{Num: num}
    for {
        var reply QueryReply
        if ck.servers[ck.lastLeader].Call("ShardMaster.Query", &args, &reply) && !reply.WrongLeader {
            return reply.Config
        }
        ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
        time.Sleep(RetryInterval)
    }
}

func (ck *Clerk) Join(servers map[int][]string) {
    args := JoinArgs{Servers: servers, Cid:ck.id, SeqNum:ck.seqNum}
    ck.seqNum++
    for {
        var reply JoinReply
        if ck.servers[ck.lastLeader].Call("ShardMaster.Join", &args, &reply) && !reply.WrongLeader {
            return
        }
        ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
        time.Sleep(RetryInterval)
    }
}

func (ck *Clerk) Leave(gids []int) {
    args := LeaveArgs{GIDs: gids, Cid:ck.id, SeqNum:ck.seqNum}
    ck.seqNum++
    for {
        var reply LeaveReply
        if ck.servers[ck.lastLeader].Call("ShardMaster.Leave", &args, &reply) && !reply.WrongLeader {
            return
        }
        ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
        time.Sleep(RetryInterval)
    }
}

func (ck *Clerk) Move(shard int, gid int) {
    args := MoveArgs{Shard: shard, GID: gid, Cid:ck.id, SeqNum:ck.seqNum}
    ck.seqNum++
    for {
        var reply MoveReply
        if ck.servers[ck.lastLeader].Call("ShardMaster.Move", &args, &reply) && !reply.WrongLeader {
            return
        }
        ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
        time.Sleep(RetryInterval)
    }
}

Common

image.png

第五步 搭建SERVER架子

基本就是把KV SERVER可以搬的代码都搬过来。
留一些特别的函数是要为SM SERVER写的。


image.png
image.png
image.png image.png

下面有2处是不一样的,在KV SERVER里,只要简单的更新STRING STRING MAP 就可以。
但是在SM SERVER,不同操作要做的事是不一样的。所以我封装在updateConfig里。这里面的JOIN 和 LEAVE需要用到REBALANCE

还有一处不一样的是QUERY,如果QUERY的返回成功,则需要去GET CONFIG然后传回来。

第6步 实现QUERY HANDLER

根据文档描述

The Query RPC's argument is a configuration number. The shardmaster replies with the configuration that has that number. If the number is -1 or bigger than the biggest known configuration number, the shardmaster should reply with the latest configuration. The result of Query(-1) should reflect every Join, Leave, or Move RPC that the shardmaster finished handling before it received the Query(-1) RPC.

如果QUERY的时候,不是-1,或者不是超过LEN(CONFIG),就取LEN -1的CONFIG的。和kv server的GET一样, 用RAFT来保证线性一致性。

image.png

第7步 实现updateConfig的框架

因为只有MOVE ,JOIN ,LEAVE需要去UPDATE CONFIG。

这个所谓的UPDATE,也是先基于最新的CONFIG,复制一份,随后在这个复制的CONFIG上面做更新。

其中MOVE最简单,只需要对CONFIG里Shards最更新。

随后JOIN 是要把一组新的GID 到SERVER的MAPING给加进新的CONFIG

LEAVE是把一组GID给从新的CONFIG里抽走。

基于上述思路有了如下代码。


image.png

第8步 实现rebalance

后来思考了下 要整体做REBALANCE
首先计算每个 replica group 分配多少个 shards, 比如10,4个GROUP,就是3,3,2,2
随后看当前的情况,把SHARD 从多的GROUP 分到少的group.
比如现在的情况是4,3,3; 新来一个GROUP,就把4先抽出去一个,3,3,3,1 再从任意一个3抽一个过去。变成3,3,2,2
如果本来是4,3,3 现在成了2个GROUP。 就需要遍历
Shards [NShards]int // shard -> gid, 看哪个SHARD的GROUP不在了,然后把这个SHARD 加到最少SHARD的那个GROUP里.
上述2个做法的逻辑没法统一,可能需要单独写。

按照上面的模式,要最大又要最小,在JAVA里用TREEMAP很好。可是GOLANG没有自带的。
基于这个考虑,我下面写的算法的时间复杂度,是moveElements * gidNumber
用了TREEMAP,可以达到moveElements * log(gidNumber)
为了代码简洁(因为这个LAB注重正确性,效率低一些),我实现复杂度高的

大概算法思路是

算出之前CONFIG,每个GID 有几个SHARD。然后JOIN的话找最大的,移到新加进来的那个的。直到达到平均值(向下取整)

如果是LEAVE的话找最小的,把LEAVE的给最小的。随后再找最小的,一直到LEAVE的GROUP的SHARD没有了


image.png
image.png image.png

测试

BUG 1

image.png image.png

BUG 2 OP is NIL

image.png

经过研究发现,需要向LABGOB 注册任何自定义的STRUCT,不然没法传输解析。


image.png
image.png

用脚本 测500次 PASS

SERVER 代码

package shardmaster

import (
    "log"
    "math"
    "raft"
    "time"
)
import "labrpc"
import "sync"
import "labgob"

type ShardMaster struct {
    mu      sync.Mutex
    me      int
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg
    // Your data here.
    configs []Config // indexed by config num
    chMap   map[int]chan Op
    cid2Seq map[int64]int
    killCh  chan bool
}

type Op struct {
    OpType  string "operation type(eg. join/leave/move/query)"
    Args    interface{} // could be JoinArgs, LeaveArgs, MoveArgs and QueryArgs, in reply it could be config
    Cid     int64
    SeqNum  int
}

func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) {
    originOp := Op{"Join",*args,args.Cid,args.SeqNum}
    reply.WrongLeader = sm.templateHandler(originOp)
}

func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) {
    originOp := Op{"Leave",*args,args.Cid,args.SeqNum}
    reply.WrongLeader = sm.templateHandler(originOp)
}

func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) {
    originOp := Op{"Move",*args,args.Cid,args.SeqNum}
    reply.WrongLeader = sm.templateHandler(originOp)
}

func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) {
    reply.WrongLeader = true;
    originOp := Op{"Query",*args,Nrand(),-1}
    reply.WrongLeader = sm.templateHandler(originOp)
    if !reply.WrongLeader {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        if args.Num >= 0 && args.Num < len(sm.configs) {
            reply.Config = sm.configs[args.Num]
        } else {
            reply.Config = sm.configs[len(sm.configs) - 1]
        }
    }
}

func (sm *ShardMaster) templateHandler(originOp Op) bool {
    wrongLeader := true
    index,_,isLeader := sm.rf.Start(originOp)
    if !isLeader {return wrongLeader}
    ch := sm.getCh(index,true)
    op := sm.beNotified(ch,index)
    if equalOp(op,originOp) {
        wrongLeader = false
    }
    return wrongLeader
}

func (sm *ShardMaster) beNotified(ch chan Op, index int) Op {
    select {
    case notifyArg := <- ch :
        close(ch)
        sm.mu.Lock()
        delete(sm.chMap,index)
        sm.mu.Unlock()
        return notifyArg
    case <- time.After(time.Duration(600)*time.Millisecond):
        return Op{}
    }
}

func equalOp(a Op, b Op) bool{
    return a.SeqNum == b.SeqNum && a.Cid == b.Cid && a.OpType == b.OpType
}

func (sm *ShardMaster) Kill() {
    sm.rf.Kill()
    sm.killCh <- true
}
// needed by shardkv tester
func (sm *ShardMaster) Raft() *raft.Raft {
    return sm.rf
}

func (sm *ShardMaster) getCh(idx int, createIfNotExists bool) chan Op{
    sm.mu.Lock()
    defer sm.mu.Unlock()
    if _, ok := sm.chMap[idx]; !ok {
        if !createIfNotExists {return nil}
        sm.chMap[idx] = make(chan Op,1)
    }
    return sm.chMap[idx]
}

func (sm *ShardMaster) updateConfig(op string, arg interface{}) {
    cfg := sm.createNextConfig()
    if op == "Move" {
        moveArg := arg.(MoveArgs)
        if _,exists := cfg.Groups[moveArg.GID]; exists {
            cfg.Shards[moveArg.Shard] = moveArg.GID
        } else {return}
    }else if op == "Join" {
        joinArg := arg.(JoinArgs)
        for gid,servers := range joinArg.Servers {
            newServers := make([]string, len(servers))
            copy(newServers, servers)
            cfg.Groups[gid] = newServers
            sm.rebalance(&cfg,op,gid)
        }
    } else if op == "Leave"{
        leaveArg := arg.(LeaveArgs)
        for _,gid := range leaveArg.GIDs {
            delete(cfg.Groups,gid)
            sm.rebalance(&cfg,op,gid)
        }
    } else {
        log.Fatal("invalid area",op)
    }
    sm.configs = append(sm.configs,cfg)
}

func (sm *ShardMaster) createNextConfig() Config {
    lastCfg := sm.configs[len(sm.configs)-1]
    nextCfg := Config{Num: lastCfg.Num + 1, Shards: lastCfg.Shards, Groups: make(map[int][]string)}
    for gid, servers := range lastCfg.Groups {
        nextCfg.Groups[gid] = append([]string{}, servers...)
    }
    return nextCfg
}

func (sm *ShardMaster) rebalance(cfg *Config, request string, gid int) {
    shardsCount := sm.groupByGid(cfg) // gid -> shards
    switch request {
    case "Join":
        avg := NShards / len(cfg.Groups)
        for i := 0; i < avg; i++ {
            maxGid := sm.getMaxShardGid(shardsCount)
            cfg.Shards[shardsCount[maxGid][0]] = gid
            shardsCount[maxGid] = shardsCount[maxGid][1:]
        }
    case "Leave":
        shardsArray,exists := shardsCount[gid]
        if !exists {return}
        delete(shardsCount,gid)
        if len(cfg.Groups) == 0 { // remove all gid
            cfg.Shards = [NShards]int{}
            return
        }
        for _,v := range shardsArray {
            minGid := sm.getMinShardGid(shardsCount)
            cfg.Shards[v] = minGid
            shardsCount[minGid] = append(shardsCount[minGid], v)
        }
    }
}
func (sm *ShardMaster) groupByGid(cfg *Config) map[int][]int {
    shardsCount := map[int][]int{}
    for k,_ := range cfg.Groups {
        shardsCount[k] = []int{}
    }
    for k, v := range cfg.Shards {
        shardsCount[v] = append(shardsCount[v], k)
    }
    return shardsCount
}
func (sm *ShardMaster) getMaxShardGid(shardsCount map[int][]int) int {
    max := -1
    var gid int
    for k, v := range shardsCount {
        if max < len(v) {
            max = len(v)
            gid = k
        }
    }
    return gid
}
func (sm *ShardMaster) getMinShardGid(shardsCount map[int][]int) int {
    min := math.MaxInt32
    var gid int
    for k, v := range shardsCount {
        if min > len(v) {
            min = len(v)
            gid = k
        }
    }
    return gid
}
func send(notifyCh chan Op,op Op) {
    notifyCh <- op
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster {
    sm := new(ShardMaster)
    sm.me = me
    sm.configs = make([]Config, 1)
    sm.configs[0].Groups = map[int][]string{}
    labgob.Register(Op{})
    labgob.Register(JoinArgs{})
    labgob.Register(LeaveArgs{})
    labgob.Register(MoveArgs{})
    labgob.Register(QueryArgs{})
    sm.applyCh = make(chan raft.ApplyMsg)
    sm.rf = raft.Make(servers, me, persister, sm.applyCh)
    // Your code here.
    sm.chMap = make(map[int]chan Op)
    sm.cid2Seq = make(map[int64]int)
    sm.killCh = make(chan bool,1)
    go func() {
        for {
            select {
            case <-sm.killCh:
                return
            case applyMsg := <-sm.applyCh:
                if !applyMsg.CommandValid {continue}
                op := applyMsg.Command.(Op)
                sm.mu.Lock()
                maxSeq,found := sm.cid2Seq[op.Cid]
                if op.SeqNum >= 0 && (!found || op.SeqNum > maxSeq) {
                    sm.updateConfig(op.OpType,op.Args)
                    sm.cid2Seq[op.Cid] = op.SeqNum
                }
                sm.mu.Unlock()
                if notifyCh := sm.getCh(applyMsg.CommandIndex,false); notifyCh != nil {
                    send(notifyCh,op)
                }
            }
        }
    }()
    return sm
}

上一篇下一篇

猜你喜欢

热点阅读