从零实现简单的go微服务治理框架

2019-12-06  本文已影响0人  炎炎daddy

成熟的微服务框架甚多,让人眼花缭乱,光是学会使用就得费老大劲儿,更不用说去理解其中的精髓了;辗转良久,最终决定自己搞一套,主要也是为了体验下设计框架时的种种思考,以便加深对微服务相关概念的理解;特在此记录其实现以供查阅。

rpc框架的设计

  1. 服务端
type Args struct {
    A int
    B int
}

type Reply struct {
    C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    return nil
}
func main() {
    s := server.NewServer()
    s.RegisterName("Arith", new(Arith))  //注册服务实例
    s.Serve("tcp", ":8972")  //开启监听
}
        // Method needs four ins: receiver, context.Context, *args, *reply.
        if mtype.NumIn() != 4 {
            if reportErr {
                log.Errorf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
            }
            continue
        }

        ctxType := mtype.In(1)
        if !ctxType.Implements(contextType) {
            if reportErr {
                log.Errorf("rpc.Register: context type of method %q is not implemented: %q\n", mname, ctxType)
            }
        }

        // First arg need not be a pointer.
        argType := mtype.In(2)
        if !isExportedOrBuiltinType(argType) {
            if reportErr {
                log.Errorf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
            }
            continue
        }
        // Second arg must be a pointer.
        replyType := mtype.In(3)
        if replyType.Kind() != reflect.Ptr {
            if reportErr {
                log.Errorf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
            }
            continue
        }
        // Reply type must be exported.
        if !isExportedOrBuiltinType(replyType) {
            if reportErr {
                log.Errorf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
            }
            continue
        }
func (server *Server) Serve(network, address string) {
    ln, err := net.Listen(network, address)   
    if err != nil {
        log.Fatal("listen error:", err)
    }
     ........
    server.serve(ln)
}

func (server *Server) serve(l net.Listener) error {
    var tempDelay time.Duration // how long to sleep on accept failure
    for {
        rw, e := l.Accept()  //接收连接
        if e != nil {
            select {
            case <-server.getDoneChan():
                return ErrServerClosed
            default:
            }
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                time.Sleep(tempDelay)
                continue
            }
            return e
        }
        tempDelay = 0
        if tc, ok := rw.(*net.TCPConn); ok {
            tc.SetKeepAlive(true)
            tc.SetKeepAlivePeriod(3 * time.Minute)
        }

        ec := newEasyConn(server, rw)   //创建连接对象
        go ec.serveConn()  //一个连接一个goroutine
    }
}

func (ec *easyConn) serveConn() {
    defer func() {
        log.Infof("serveConn exit")
        if err := recover(); err != nil {
            const size = 64 << 10
            buf := make([]byte, size)
            ss := runtime.Stack(buf, false)
            if ss > size {
                ss = size
            }
            buf = buf[:ss]
        }
        ec.rwc.Close()

    }()

    r := bufio.NewReaderSize(ec.rwc, 1024)

    for {
        ec.rwc.SetReadDeadline(time.Now().Add(time.Duration(ec.maxIdleTime) * time.Second))
        req, err := ec.readRequest(r)
        if err != nil {
            if err != io.EOF {
                log.Errorf("readRequest error %v", err)
            }
            return
        }

        if req.IsHeartbeat() {
            //log.Debugf("server receives heartbeat at time %d", time.Now().Unix())
            req.SetMessageType(protocol.Response)
            ec.writeResponse(req)
            protocol.FreeMsg(req)
            continue
        }
        ctx := context.WithValue(context.Background(), ConnDataKey{}, ec)
        ec.server.jobChan <- &workerJob{
            ctx:  ctx,
            conn: ec,
            req:  req,
        }
    }
}

2.客户端

type RPCClient struct {
    network     string
    address     string
    servicePath string

    reconnectTryNums int
    //  codec ClientCodec
    conn net.Conn
    //reqMutex sync.Mutex // protects following
    //request  Request
    heartBeatTryNums  int
    heartBeatTimeout  int
    heartBeatInterval int64

    mutex    sync.Mutex // protects following
    seq      uint64
    pending  map[uint64]*Call
    lastSend int64
    status   ConnStatus
    suicide  bool

    DialTimeout time.Duration
    doneChan chan struct{}
}
func NewRPCClient(network, address, servicePath string, dialTimeout time.Duration) (*RPCClient, error) {
    log.Infof("create rpc client for netword %s address %s service %s", network, address, servicePath)
    var conn net.Conn
    var err error
    if network == "tcp" {
        conn, err = Dial(network, address, dialTimeout)
        if err != nil {
            return nil, err
        }
    } else if network == "http" {
        conn, err = DialHTTP(network, address, dialTimeout)
        if err != nil {
            return nil, err
        }
    } else {
        return nil, fmt.Errorf("unsupport network %s", network)
    }

    client := &RPCClient{
        network:           network,
        address:           address,
        servicePath:       strings.ToLower(servicePath),
        conn:              conn,
        pending:           make(map[uint64]*Call),
        heartBeatInterval: defHeatBeatInterval,
        doneChan:          make(chan struct{}),
        DialTimeout:       dialTimeout,
    }

    go client.input()
    go client.keepalive()
    return client, nil
}

3.客户端请求和响应

func (client *RPCClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call, options ...BeforeOrAfterCallOption) *Call {
    call := new(Call)
    call.ServicePath = client.servicePath
    call.ServiceMethod = serviceMethod
    call.ctx = ctx
    call.Args = args
    call.Reply = reply
    call.serializeType = protocol.MsgPack
    for _, opt := range options {
        if opt.after {
            call.AfterCalls = append(call.AfterCalls, opt.option)
        } else {
            opt.option(call)
        }
    }

    if done == nil {
        done = make(chan *Call, 10) // buffered.
    } else {
        // If caller passes done != nil, it must arrange that
        // done has enough buffer for the number of simultaneous
        // RPCs that will be using that channel. If the channel
        // is totally unbuffered, it's best not to run at all.
        if cap(done) == 0 {
            log.Panic("rpc: done channel is unbuffered")
        }
    }
    call.Done = done
    client.send(call)
    return call
}



func (client *RPCClient) send(call *Call) {

    // Register this call.
    client.mutex.Lock()
    if client.status != ConnAvailable {
        client.mutex.Unlock()
        call.Error = ErrShutdown
        call.done()
        return
    }

    seq := atomic.AddUint64(&client.seq, 1)
    client.pending[seq] = call
    client.lastSend = time.Now().Unix()
    rawConn := client.conn
    client.mutex.Unlock()

    req, err := client.createRequest(call, seq)
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
        return
    }

    _, err = rawConn.Write(req.Encode())
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}

func (client *RPCClient) input() {
    var err error
    resp := protocol.NewMessage()

    for {
        _, err = client.readResponse(resp)
        if err != nil {
            if client.suicide {
                break
            }
            log.Errorf("readResponse error %+v client %+v", err, client)
            err = client.reconnect()
            if err != nil {
                break
            } else {
                continue
            }
        }

        seq := resp.Seq()
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()

        if resp.IsHeartbeat() {
            call.done()
            continue
        }

        if call != nil {
            err = checkReplyError(resp)
            if err != nil {
                call.Error = err
                call.done()
                continue
            }

            codec := share.Codecs[resp.SerializeType()]
            if codec == nil {
                err = fmt.Errorf("can not find codec for %d", resp.SerializeType())
                call.Error = err
                call.done()
                continue
            }

            err = codec.Decode(resp.Payload, call.Reply)
            if err != nil {
                call.Error = err
                call.done()
                continue
            }

            if resp.Metadata != nil {
                call.Metadata = resp.Metadata
            }
            for _, afterCall := range call.AfterCalls {
                afterCall(call)
            }
            call.done()
        }
    }

    client.mutex.Lock()
    if client.status == ConnReconnectFail {
        client.close(err, false)
    }
    client.mutex.Unlock()
    log.Infof("client input goroutine exit")
}

(未完待续...)
源码地址:https://github.com/masonchen2014/easymicro

上一篇 下一篇

猜你喜欢

热点阅读