Rpcx源码之Client
一、概述
在Rpcx框架源码中,存在Client的角色,用来完成承担Client stub;相对来说Server,通过从注册中心获取已注册暴露对外提供功能的Service,本将已注册的service存放到本地,减少频繁的读取registry中心的记录,同时也通过watch机制获取registry中注册服务的变更(采用channel来监听service的变更),来同步本地缓存的记录,通过RPCClient与server完成通信采用同步或异步的通信方式。对应的Clinet提供了如下的功能:
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string,
args interface{}, reply interface{}) error // 同步调用
func (client *Client) Close() error // 释放RPCClient相关资源
func (c *Client) Connect(network, address string) error // 连接server
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string,
args interface{}, reply interface{}, done chan *Call) *Call // 异步
func (client *Client) IsClosing() bool // 本地RPCClient是否已关闭
func (client *Client) IsShutdown() bool // RPCClient是否停止
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message)
(map[string]string, []byte, error) //原始信息发送
对应的XClinet提供了如下的功能:(XClient是在Client的基础上增加路由规则、失败模式、服务发现治理等功能)
func (c *xClient) SetSelector(s Selector) // 设置路由规则
func (c *xClient) SetPlugins(plugins PluginContainer) // 设置插件 增强client特性
func (c *xClient) ConfigGeoSelector(latitude, longitude float64) // 就近路由规则
func (client *Client) Call(ctx context.Context, servicePath,serviceMethod string,
args interface{},reply interface{}) error // 同步调用
func (client *Client) Go(ctx context.Context, servicePath,serviceMethod string,
args interface{}, reply interface{}, done chan *Call) *Call // 异步执行调用
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message)
(map[string]string, []byte, error) // 发送原始信息
func (c *xClient) Broadcast(ctx context.Context, serviceMethod string,
args interface{}, reply interface{}) error // 广播请求模式
func (c *xClient) Fork(ctx context.Context, serviceMethod string,
args interface{}, reply interface{}) error // Fock 模式请求
接下来详细剖析对应的源码
二、源码
本文源码主要位于:rpcx\client包中client.go 和 xclient.go
Client结构
// 支持同步、异步的调用模式;一个Client就代表一个RPCClient
type Client struct {
option Option // 配置选项
Conn net.Conn // connection
r *bufio.Reader // 读
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call // 挂起的调用
closing bool // user has called Close client关闭
shutdown bool // server has told us to stop client停止
pluginClosed bool // the plugin has been called plugin禁用
Plugins PluginContainer // 插件容器
ServerMessageChan chan<- *protocol.Message // 接收server的通知channel
}
从上面的Client结构体的源码可以看到提供了Client只是具备了简单RPC通信功能:同步和异步,并以Plugin方式提供Client的额外特性、Client权限凭证等操作;具体的提供的函数如下:
1.1 新建Client
// 新建Client
// 目前支持Option:服务分组、重试、TLS配置、连接timeout、读取timeout、
// 写入timeout、调用备份失败有效期、断路器、协议序列化、协议压缩方式、心跳等相关参数设置
// 默认已提供DefaultOption满足基本参数设置需求,可以在此基础上按需修改.
func NewClient(option Option) *Client {
return &Client{
option: option,
}
}
1.2 连接Server
// 连接server并准备发送client的请求:client是需要指定的network来连接server的
// 本部分源码位于:client包下的connection.go文件中
//
func (c *Client) Connect(network, address string) error {
var conn net.Conn
var err error
switch network {
case "http":
conn, err = newDirectHTTPConn(c, network, address)
case "kcp":
conn, err = newDirectKCPConn(c, network, address)
case "quic":
conn, err = newDirectQuicConn(c, network, address)
case "unix":
conn, err = newDirectConn(c, network, address)
default: // 默认TCP
fn := makeConnMap[network]
if fn != nil {
conn, err = fn(c, network, address)
} else { // TLS方式连接
conn, err = newDirectConn(c, network, address)
}
}
if err == nil && conn != nil { // 设置connection的client端相关参数
if c.option.ReadTimeout != 0 { // 读超时
conn.SetReadDeadline(time.Now().Add(c.option.ReadTimeout))
}
if c.option.WriteTimeout != 0 { // 写超时
conn.SetWriteDeadline(time.Now().Add(c.option.WriteTimeout))
}
c.Conn = conn
c.r = bufio.NewReaderSize(conn, ReaderBuffsize) // 设置buffer
//c.w = bufio.NewWriterSize(conn, WriterBuffsize)
// start reading and writing since connected
go c.input() // 等待connected后 执行read或write操作
// 额外开启goroutine来执行heartbeat
if c.option.Heartbeat && c.option.HeartbeatInterval > 0 {
go c.heartbeat()
}
}
return err
}
等待连接正常进行read或write操作的入口
// 读取server的响应结果:
// 1、本地缓存不存在对应调用对象call记录
// 2、调用对象响应结果返回:a、出现了Error b、正常
func (client *Client) input() { //
var err error
var res = protocol.NewMessage() // 请求消息
// for循环根据用户定义的内容来构建成发送给server端的protocol.Message信息
// 读取server返回给客户端的protocol.Message信息
// client相关的timeou参数设置
for err == nil {
if client.option.ReadTimeout != 0 { // 设置读取有效期
client.Conn.SetReadDeadline(time.Now().Add(client.option.ReadTimeout))
}
// 完成将用户定义的request转为Message
err = res.Decode(client.r) // 解码
//res, err = protocol.Read(client.r)
if err != nil {
break
}
// 验证Message的合法性
seq := res.Seq()
var call *Call
isServerMessage := (res.MessageType() == protocol.Request
&& !res.IsHeartbeat() && res.IsOneway()) // 是否为server消息
// 当前Message属于request的时 获取其关联的pending中对应call:
// pending中记录的是待执行的call
if !isServerMessage {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
}
switch {
case call == nil: // 当调用对象call不存在本地缓存中
if isServerMessage { //
if client.ServerMessageChan != nil {
// 新建go协程单独处理response结果
go client.handleServerRequest(res)
res = protocol.NewMessage()
}
continue
}
case res.MessageStatusType() == protocol.Error: // 消息状态 = Error
// We've got an error response. Give this to the request
// 当server端输出一个错误响应 给到对应的请求
if len(res.Metadata) > 0 { // 提取响应内容中meta data数据
meta := make(map[string]string, len(res.Metadata))
for k, v := range res.Metadata {
meta[k] = v
}
call.ResMetadata = meta
call.Error = ServiceError(meta[protocol.ServiceError])
}
if call.Raw { // 调用使用的raw形式 需要将响应内容进行转换
call.Metadata, call.Reply, _ = convertRes2Raw(res)
call.Metadata[XErrorMessage] = call.Error.Error()
}
call.done() // 触发结果通道channel
default: // 响应处理
if call.Raw { // Raw形式单独提取出来 完成对应的转换:获取metadata、reply
call.Metadata, call.Reply, _ = convertRes2Raw(res)
} else {
data := res.Payload // 提取响应体
if len(data) > 0 {
// 根据对应的解码来进行解码
codec := share.Codecs[res.SerializeType()]
if codec == nil {
call.Error = ServiceError(ErrUnsupportedCodec.Error())
} else {
// 将对应的返回结果赋给本次rpc的Call中的Reply
err = codec.Decode(data, call.Reply)
if err != nil {
call.Error = ServiceError(err.Error())
}
}
}
if len(res.Metadata) > 0 { // 提取metadata
meta := make(map[string]string, len(res.Metadata))
for k, v := range res.Metadata {
meta[k] = v
}
// 赋予本次call对应的响应元数据ResMetadata
call.ResMetadata = res.Metadata
}
}
call.done() // 通知结果通道
}
res.Reset() // 重置Message对象 便于下一次新的请求Message处理
}
...省略代码
}
每次进行的RPC过程会被封装成Call对象,通过Done的channel来监控每次Call完成情况
// Call represents an active RPC.
type Call struct {
ServicePath string // RPC调用的Service名字及其对应的Method.
ServiceMethod string //
Metadata map[string]string //metadata
ResMetadata map[string]string // 响应结果的metadata
Args interface{} // 函数的参数(*struct).
Reply interface{} // 函数的应答结果 (*struct).
Error error // 一次Call完成时对应的Error状态
Done chan *Call // 存放每次完成的Call.
Raw bool // 是否采用原始数据信息方式
}
发起远程调用(同步调用)
func (client *Client) call(ctx context.Context, servicePath, serviceMethod string,
args interface{}, reply interface{}) error {
seq := new(uint64) // 请求ID
// 将seq添加到context中便于在Server端时获取
ctx = context.WithValue(ctx, seqKey{}, seq)
Done := client.Go(ctx, servicePath, serviceMethod, args,
reply, make(chan *Call, 1)).Done
// 异步请求
var err error
select {
case <-ctx.Done(): //cancel by context 取消
client.mutex.Lock()
// 使用互斥锁 剔除处于pending状态的call;
// 处于pending状态的call存放在client端的本地内存中map[uint64]*Call
call := client.pending[*seq]
delete(client.pending, *seq)
client.mutex.Unlock()
if call != nil { // 将本次Call投递到channel中
call.Error = ctx.Err()
call.done() // 执行Call投递到channel
}
return ctx.Err() // 输出本次request的错误状态
case call := <-Done: // 从Done的channel获取Call:代表Call已完成
err = call.Error
meta := ctx.Value(share.ResMetaDataKey) // 获取response内容
if meta != nil && len(call.ResMetadata) > 0 {
resMeta := meta.(map[string]string)
for k, v := range call.ResMetadata {
resMeta[k] = v
}
}
}
return err
目前rpcx 支持许多服务发现机制,同时也可以自定义服务发现:
- [Peer to Peer] 客户端直连每个服务节点。
- [Peer to Multiple]客户端可以连接多个服务。服务可以被编程式配置。
- [Zookeeper] 通过 zookeeper 寻找服务。
- [Etcd] 通过 etcd 寻找服务。
- [Consul] 通过 consul 寻找服务。
- [mDNS] 通过 mDNS 寻找服务(支持本地服务发现)。
- [In process]在同一进程寻找服务。客户端通过进程调用服务,不走TCP或UDP,方便调试使用。
rpcx 支持 故障模式:
- [Failfast]:如果调用失败,立即返回错误
- [Failover]:选择其他节点,直到达到最大重试次数
- [Failtry]:选择相同节点并重试,直到达到最大重试次数
- [Failbackup]:选择两个节点,当一个节点未能返回结果时,则请求第二个节点来获取结果
rpcx 支持 路由模式:
[Random]: 随机选择节点
[Roundrobin]: 用 [roundrobin] (https://zh.wikipedia.org/wiki/%E5%BE%AA%E7%92%B0%E5%88%B6) 算法选择节点
[Consistent hashing]: 如果服务路径、方法和参数一致,就选择同一个节点, 使用了非常快的jump consistent hash算法
[Weighted]: 根据元数据里配置好的权重(weight=xxx
)来选择节点
[Network quality]: 根据ping
的结果来选择节点。网络质量越好,该节点被选择的几率越大
[Geography]: 如果有多个数据中心,客户端趋向于连接同一个数据机房的节点
[Customized Selector]: 如果以上的选择器都不适合你,你可以自己定制选择器
rpcx特殊的两种请求方式
- Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。此时FailMode 和 SelectMode的设置是无效的。请设置超时来避免阻塞。
- Fork 表示向所有服务器发送请求,只要任意一台服务器正确返回就成功。此时FailMode 和 SelectMode的设置是无效的。
三、使用
实例server端代码
import (
"context"
"flag"
"github.com/smallnest/rpcx/server"
"log"
"net"
"rpcx/examples/models"
)
var (
addr = flag.String("addr","localhost:8972","server address")
)
func main() {
flag.Parse()
s := server.NewServer()
//s.Plugins.Add(&ConnectionListerPlugin{})
s.Register(new(models.Arith),"") // 只提供rcvr不指定servicePath和method以及对应的name
s.RegisterName("PBArith", new(models.PBArith),"") // 提供rcvr及其name
s.RegisterFunction("PB-Mul",models.Mul,"") // 注入函数(对应的函数不需要提供调用方) 提供servicePath和method
s.RegisterFunctionName("PMul","mul",models.Mul,"") // 提供servicePath和method及其name
s.Serve("tcp", *addr)
//log.Println(" Server Address= " + s.Address().String())
}
type ConnectionListerPlugin struct {
}
func (clis *ConnectionListerPlugin) HandleLister(conn net.Conn) (net.Conn, bool){
log.Printf("Server Listener Address %v \n", conn.LocalAddr().String())
return conn,true
}
func mul(ctx context.Context, args *models.Args, reply *models.Reply) error {
reply.C = args.A * args.B
return nil
}
实例client端代码
package main
import (
"context"
"flag"
"rpcx/client"
"rpcx/examples/models"
"log"
"time"
)
var (
addr = flag.String("addr","localhost:8972","server address")
)
func main() {
flag.Parse()
//
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &models.Args{
A: 10,
B: 20,
}
for{
reply := &models.Reply{}
err := xclient.Call(context.Background(),"Mul", args, reply)
if err != nil{
log.Fatalf("failed to call:%v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(1e9)
}
}
辅助代码
package models
import (
"context"
"fmt"
"rpcx/_testutils"
)
// 参数
type Args struct {
A int
B int
}
// 回复结果
type Reply struct {
C int
}
// 服务
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A + args.B
fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
*reply = "hello " + *args
return nil
}
type PBArith int
func (t *PBArith) Mul(ctx context.Context, args *testutils.ProtoArgs, reply *testutils.ProtoReply) error {
reply.C = args.A * args.B
return nil
}
func (t *Arith) ThriftMul(ctx context.Context, args *testutils.ThriftArgs_, reply *testutils.ThriftReply) error {
reply.C = args.A * args.B
return nil
}
func Mul(ctx context.Context, args *testutils.ProtoArgs, reply *testutils.ProtoReply) error {
reply.C = args.A * args.B
return nil
}
四、其他
Client分析源码
Client执行链路如下: