Golang

Rpcx源码之Client

2019-02-22  本文已影响12人  神奇的考拉

一、概述

在Rpcx框架源码中,存在Client的角色,用来完成承担Client stub;相对来说Server,通过从注册中心获取已注册暴露对外提供功能的Service,本将已注册的service存放到本地,减少频繁的读取registry中心的记录,同时也通过watch机制获取registry中注册服务的变更(采用channel来监听service的变更),来同步本地缓存的记录,通过RPCClient与server完成通信采用同步或异步的通信方式。对应的Clinet提供了如下的功能:

func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string,
                        args interface{}, reply interface{}) error // 同步调用
func (client *Client) Close() error // 释放RPCClient相关资源
func (c *Client) Connect(network, address string) error // 连接server
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string,
           args interface{}, reply interface{}, done chan *Call) *Call // 异步
func (client *Client) IsClosing() bool  // 本地RPCClient是否已关闭
func (client *Client) IsShutdown() bool // RPCClient是否停止

func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) 
          (map[string]string, []byte, error) //原始信息发送

对应的XClinet提供了如下的功能:(XClient是在Client的基础上增加路由规则、失败模式、服务发现治理等功能)

func (c *xClient) SetSelector(s Selector)  // 设置路由规则
func (c *xClient) SetPlugins(plugins PluginContainer) // 设置插件 增强client特性
func (c *xClient) ConfigGeoSelector(latitude, longitude float64) // 就近路由规则
func (client *Client) Call(ctx context.Context, servicePath,serviceMethod string, 
      args interface{},reply interface{}) error // 同步调用
func (client *Client) Go(ctx context.Context, servicePath,serviceMethod string, 
      args interface{}, reply interface{}, done chan *Call) *Call // 异步执行调用
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) 
         (map[string]string, []byte, error)  // 发送原始信息
func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, 
      args interface{},  reply interface{}) error  // 广播请求模式
func (c *xClient) Fork(ctx context.Context, serviceMethod string, 
    args interface{},  reply interface{}) error // Fock 模式请求

接下来详细剖析对应的源码

二、源码

本文源码主要位于:rpcx\client包中client.go 和 xclient.go
Client结构

// 支持同步、异步的调用模式;一个Client就代表一个RPCClient
type Client struct {
    option Option // 配置选项

    Conn net.Conn      // connection
    r    *bufio.Reader   // 读

    mutex        sync.Mutex // protects following
    seq          uint64
    pending      map[uint64]*Call    // 挂起的调用
    closing      bool      // user has called Close  client关闭
    shutdown     bool    // server has told us to stop  client停止
    pluginClosed bool  // the plugin has been called plugin禁用

    Plugins PluginContainer         // 插件容器

    ServerMessageChan chan<- *protocol.Message  // 接收server的通知channel
}

从上面的Client结构体的源码可以看到提供了Client只是具备了简单RPC通信功能:同步和异步,并以Plugin方式提供Client的额外特性、Client权限凭证等操作;具体的提供的函数如下:

1.1 新建Client

// 新建Client
// 目前支持Option:服务分组、重试、TLS配置、连接timeout、读取timeout、
// 写入timeout、调用备份失败有效期、断路器、协议序列化、协议压缩方式、心跳等相关参数设置
// 默认已提供DefaultOption满足基本参数设置需求,可以在此基础上按需修改.
func NewClient(option Option) *Client {
    return &Client{
        option: option,
    }
}

1.2 连接Server

// 连接server并准备发送client的请求:client是需要指定的network来连接server的
// 本部分源码位于:client包下的connection.go文件中
// 
func (c *Client) Connect(network, address string) error {
    var conn net.Conn
    var err error

    switch network {
    case "http":   
        conn, err = newDirectHTTPConn(c, network, address)
    case "kcp":
        conn, err = newDirectKCPConn(c, network, address)
    case "quic":
        conn, err = newDirectQuicConn(c, network, address)
    case "unix":
        conn, err = newDirectConn(c, network, address)
    default:  // 默认TCP
        fn := makeConnMap[network]
        if fn != nil {   
            conn, err = fn(c, network, address)
        } else {  // TLS方式连接 
            conn, err = newDirectConn(c, network, address)
        }
    }

    if err == nil && conn != nil {  // 设置connection的client端相关参数
        if c.option.ReadTimeout != 0 { // 读超时
            conn.SetReadDeadline(time.Now().Add(c.option.ReadTimeout))
        }
        if c.option.WriteTimeout != 0 { // 写超时
            conn.SetWriteDeadline(time.Now().Add(c.option.WriteTimeout))
        }

        c.Conn = conn
        c.r = bufio.NewReaderSize(conn, ReaderBuffsize)  // 设置buffer
        //c.w = bufio.NewWriterSize(conn, WriterBuffsize)

        // start reading and writing since connected
        go c.input()   // 等待connected后 执行read或write操作

                // 额外开启goroutine来执行heartbeat
        if c.option.Heartbeat && c.option.HeartbeatInterval > 0 { 
            go c.heartbeat()
        }

    }

    return err
}

等待连接正常进行read或write操作的入口

// 读取server的响应结果:
// 1、本地缓存不存在对应调用对象call记录
// 2、调用对象响应结果返回:a、出现了Error b、正常
func (client *Client) input() { //
    var err error
    var res = protocol.NewMessage() // 请求消息

        // for循环根据用户定义的内容来构建成发送给server端的protocol.Message信息
       //              读取server返回给客户端的protocol.Message信息
       // client相关的timeou参数设置
    for err == nil {
        if client.option.ReadTimeout != 0 { // 设置读取有效期
            client.Conn.SetReadDeadline(time.Now().Add(client.option.ReadTimeout))
        }

        // 完成将用户定义的request转为Message
        err = res.Decode(client.r) // 解码
        //res, err = protocol.Read(client.r)

        if err != nil {
            break
        }
        // 验证Message的合法性
        seq := res.Seq()
        var call *Call
        isServerMessage := (res.MessageType() == protocol.Request 
                   && !res.IsHeartbeat() && res.IsOneway()) // 是否为server消息
// 当前Message属于request的时  获取其关联的pending中对应call:
//     pending中记录的是待执行的call
        if !isServerMessage {  
            client.mutex.Lock()
            call = client.pending[seq]
            delete(client.pending, seq)
            client.mutex.Unlock()
        }

        switch {
        case call == nil: // 当调用对象call不存在本地缓存中
            if isServerMessage {  // 
                if client.ServerMessageChan != nil {
                                       // 新建go协程单独处理response结果
                    go client.handleServerRequest(res) 
                    res = protocol.NewMessage()
                }
                continue
            }
        case res.MessageStatusType() == protocol.Error: // 消息状态 = Error
            // We've got an error response. Give this to the request
            // 当server端输出一个错误响应 给到对应的请求
            if len(res.Metadata) > 0 { // 提取响应内容中meta data数据
                meta := make(map[string]string, len(res.Metadata))
                for k, v := range res.Metadata {
                    meta[k] = v
                }
                call.ResMetadata = meta
                call.Error = ServiceError(meta[protocol.ServiceError])
            }

            if call.Raw { // 调用使用的raw形式  需要将响应内容进行转换
                call.Metadata, call.Reply, _ = convertRes2Raw(res)
                call.Metadata[XErrorMessage] = call.Error.Error()
            }
            call.done() // 触发结果通道channel
        default: // 响应处理
            if call.Raw { // Raw形式单独提取出来 完成对应的转换:获取metadata、reply
                call.Metadata, call.Reply, _ = convertRes2Raw(res)
            } else {
                data := res.Payload // 提取响应体
                if len(data) > 0 {
                                       // 根据对应的解码来进行解码
                    codec := share.Codecs[res.SerializeType()] 
                    if codec == nil {
                        call.Error = ServiceError(ErrUnsupportedCodec.Error())
                    } else {
                  // 将对应的返回结果赋给本次rpc的Call中的Reply
                        err = codec.Decode(data, call.Reply) 
                        if err != nil {
                            call.Error = ServiceError(err.Error())
                        }
                    }
                }
                if len(res.Metadata) > 0 { // 提取metadata
                    meta := make(map[string]string, len(res.Metadata))
                    for k, v := range res.Metadata {
                        meta[k] = v
                    }
// 赋予本次call对应的响应元数据ResMetadata
                    call.ResMetadata = res.Metadata 
                }

            }

            call.done() // 通知结果通道
        }

        res.Reset() // 重置Message对象 便于下一次新的请求Message处理
    }

    ...省略代码
}

每次进行的RPC过程会被封装成Call对象,通过Done的channel来监控每次Call完成情况

// Call represents an active RPC.
type Call struct {
    ServicePath   string               // RPC调用的Service名字及其对应的Method.
    ServiceMethod string             // 
    Metadata      map[string]string  //metadata
    ResMetadata   map[string]string  // 响应结果的metadata
    Args          interface{} // 函数的参数(*struct).
    Reply         interface{} // 函数的应答结果 (*struct).
    Error         error       // 一次Call完成时对应的Error状态
    Done          chan *Call  // 存放每次完成的Call.
    Raw           bool        // 是否采用原始数据信息方式
}

发起远程调用(同步调用)

func (client *Client) call(ctx context.Context, servicePath, serviceMethod string,
     args interface{}, reply interface{}) error {
        seq := new(uint64)                          // 请求ID
     // 将seq添加到context中便于在Server端时获取
    ctx = context.WithValue(ctx, seqKey{}, seq) 
    Done := client.Go(ctx, servicePath, serviceMethod, args, 
            reply, make(chan *Call, 1)).Done
        // 异步请求
    var err error
    select {
    case <-ctx.Done(): //cancel by context  取消
        client.mutex.Lock()  
    // 使用互斥锁 剔除处于pending状态的call; 
    //  处于pending状态的call存放在client端的本地内存中map[uint64]*Call
        call := client.pending[*seq]
        delete(client.pending, *seq)
        client.mutex.Unlock()
        if call != nil { //  将本次Call投递到channel中
            call.Error = ctx.Err()
            call.done() // 执行Call投递到channel
        }

        return ctx.Err() // 输出本次request的错误状态
    case call := <-Done: // 从Done的channel获取Call:代表Call已完成
        err = call.Error
        meta := ctx.Value(share.ResMetaDataKey) // 获取response内容
        if meta != nil && len(call.ResMetadata) > 0 {
            resMeta := meta.(map[string]string)
            for k, v := range call.ResMetadata {
                resMeta[k] = v
            }
        }
    }

    return err

目前rpcx 支持许多服务发现机制,同时也可以自定义服务发现:

rpcx 支持 故障模式:
rpcx 支持 路由模式:

[Random]: 随机选择节点
[Roundrobin]: 用 [roundrobin] (https://zh.wikipedia.org/wiki/%E5%BE%AA%E7%92%B0%E5%88%B6) 算法选择节点
[Consistent hashing]: 如果服务路径、方法和参数一致,就选择同一个节点, 使用了非常快的jump consistent hash算法
[Weighted]: 根据元数据里配置好的权重(weight=xxx)来选择节点
[Network quality]: 根据ping的结果来选择节点。网络质量越好,该节点被选择的几率越大
[Geography]: 如果有多个数据中心,客户端趋向于连接同一个数据机房的节点
[Customized Selector]: 如果以上的选择器都不适合你,你可以自己定制选择器

rpcx特殊的两种请求方式

三、使用

实例server端代码

import (
    "context"
    "flag"
    "github.com/smallnest/rpcx/server"
    "log"
    "net"
    "rpcx/examples/models"
    )

var (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    flag.Parse()

    s := server.NewServer()
    //s.Plugins.Add(&ConnectionListerPlugin{})

    s.Register(new(models.Arith),"")  // 只提供rcvr不指定servicePath和method以及对应的name
    s.RegisterName("PBArith", new(models.PBArith),"") // 提供rcvr及其name
    s.RegisterFunction("PB-Mul",models.Mul,"") // 注入函数(对应的函数不需要提供调用方) 提供servicePath和method
    s.RegisterFunctionName("PMul","mul",models.Mul,"") // 提供servicePath和method及其name

    s.Serve("tcp", *addr)
    //log.Println(" Server Address= " + s.Address().String())
}

type ConnectionListerPlugin struct {
}

func (clis *ConnectionListerPlugin) HandleLister(conn net.Conn) (net.Conn, bool){
    log.Printf("Server Listener Address %v \n", conn.LocalAddr().String())
    return conn,true
}

func mul(ctx context.Context, args *models.Args, reply *models.Reply) error {
    reply.C = args.A * args.B
    return nil
}

实例client端代码

package main

import (
    "context"
    "flag"
    "rpcx/client"
    "rpcx/examples/models"
    "log"
    "time"
)

var  (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    flag.Parse()
    //
    d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for{
        reply := &models.Reply{}
        err := xclient.Call(context.Background(),"Mul", args, reply)
        if err != nil{
            log.Fatalf("failed to call:%v", err)
        }
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(1e9)
    }
}

辅助代码

package models

import (
    "context"
    "fmt"
    "rpcx/_testutils"
)

// 参数
type Args struct {
    A int
    B int
}

// 回复结果
type Reply struct {
    C int
}

// 服务
type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
    return nil
}

func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A + args.B
    fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
    return nil
}

func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
    *reply = "hello " + *args
    return nil
}

type PBArith int

func (t *PBArith) Mul(ctx context.Context, args *testutils.ProtoArgs, reply *testutils.ProtoReply) error {
    reply.C = args.A * args.B
    return nil
}

func (t *Arith) ThriftMul(ctx context.Context, args *testutils.ThriftArgs_, reply *testutils.ThriftReply) error {
    reply.C = args.A * args.B
    return nil
}

func Mul(ctx context.Context, args *testutils.ProtoArgs, reply *testutils.ProtoReply) error {
    reply.C = args.A * args.B
    return nil
}

四、其他

Client分析源码
Client执行链路如下:

上一篇下一篇

猜你喜欢

热点阅读