Go RPC(一)
RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,用于使不同计算机间的程序能够相互调用,就像调用本地程序一样。它的基本原理是将远程调用封装成本地调用的方式,使得远程的调用过程对于调用方来说是透明的。
在 RPC 中,客户端通过发送一个请求消息给服务端,服务端接收到请求后执行相应的操作,并返回一个响应消息给客户端。通常情况下,RPC 协议会对请求和响应消息进行序列化和反序列化,以便在网络上传输。
net/rpc
在go中,net/rpc
是这么说的,服务端注册一个对象使其作为具有对象类型名称的服务可见。该对象的可导出的方法可以支持远程调用,在同一个服务端可以注册多个不同类型的对象,但不能注册多个同样的对象。一个对象的方法要支持远程访问,必须满足以下几个标准:
- the method's type is exported. // 方法类型是可导出的
- the method is exported. // 方法必须是可导出的
- the method has two arguments, both exported (or builtin) types. // 必须有2个参数,且类型是可导出的或者内置的类型
- the method's second argument is a pointer. // 第二个参数必须是指针,因为要接收返回值。
- the method has return type error. // 返回一个error
例如:
// 第一个参数表示客户端调用传递的参数,第二个参数是返回值
func (t *T) MethodName(argType T1, replyType *T2) error
go中的rpc官方给了个例子:
服务端
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func server() {
arith := new(Arith)
s := rpc.NewServer()
// s.RegisterName 和 s.Register比较,多了个自定义的命名空间,比如这里是`chujiu.Arith`,那客户端调用就是`chujiu.Arith.xxxx`,如果是Register则调用时服务名称就是`Arith.xxxx`
s.RegisterName("chujiu.Arith", arith)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go s.Accept(l)
}
对象 Arith
的两个方法都是可以导出的,且有2个参数,第一个作为客户端调用传递过来的参数,第二个是指针作为返回值,返回一个error。
new一个新的server后,然后将对象Arith
注册到该server上,然后监听1234端口,然后等待连接。
客户端
func client() {
c, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
args := &Args{7, 8}
var reply int
err = c.Call("chujiu.Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
}
客户端先链接上服务端,然后通过call调用远程服务的方法,看起来就向调用本地函数一样,第一个参数是服务名称,第二个是参数,第三个是返回值,还有一个异步调用的方法,返回一个call对象,然后通过对call对象的done channel读取返回值
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
原理
服务端
rpc的实现原理很简单,就是在服务端启动的时候,通过一个map将注册的服务保存起来,然后客户端调用时,通过名称从服务端保存的map中找到真正的对象,然后调用对象的方法。
register
保存了注册的对象,第一个参数是注册的对象,第二个是服务的名称,第三个参数 是否使用自定义的name。server对象中保存了请求的参数以及注册的服务的map
go/src/net/rpc/server.go:239
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
func (server *Server) register(rcvr any, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := name
// 这里可以看到如果不是自定义的,将直接用注册的对象名称作为服务名称
if !useName {
sname = reflect.Indirect(s.rcvr).Type().Name()
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
// 这里检查了对象是否是导出的,在go中就看首字母是否大写
if !useName && !token.IsExported(sname) {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname
// Install the methods
// 这里对对象的方法以及参数等做了校验
s.method = suitableMethods(s.typ, logRegisterError)
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
// 这里第二次调用只是为了确保安装的方法都符合上面几个规范,提高注册方法的安全性和可靠性。
// 比如需要导出的方法因为参数不符合规范而被第一次调研过滤掉了,看起来有点多余
// 但第二次是为了确保被注册的所有方法都符合规范,而不是为了重新获取可导出的方法,这也是第二次导出的并不需要赋值的原因
method := suitableMethods(reflect.PointerTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Print(str)
return errors.New(str)
}
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
可以看到先是通过suitableMethods
方法做了检查后,将注册的对象都存储到server.serviceMap
中了,值为服务名称,值为service,而service里存储了对象的名称,类型,方法等。
go/src/net/rpc/server.go:161
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
go/src/net/rpc/server.go:284
func suitableMethods(typ reflect.Type, logErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if !method.IsExported() {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if logErr {
log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
}
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
if logErr {
log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
}
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Pointer {
if logErr {
log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
}
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
if logErr {
log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
}
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
if logErr {
log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
}
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
if logErr {
log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
}
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
该方法里主要是做了对方法是否导出,以及参数是否否和标准做了检查,最后将对象的方法存储到service.methods
这个map中返回key是方法名称,value是一个methed的结构体,包含了可远程调用的方法名称,参数以及返回值。
注册对象之后就是监听端口,l, e := net.Listen("tcp", ":1234")
等待客户端链接调用。
go/src/net/rpc/server.go:628
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
}
}
这里可以看到如果有链接进来,则启一个新的go程去处理。ServeConn使用的是gob这种二进制的编码解码方式, gob是go内置的二进制编码方式,相比较binary,它存储了一些额外的信息,不需要知道长度,就能解码,缺点就是占用的字节数会很多,儿binary需要自己定义好对象的长度,在解码时才知道读到哪些长度的字节。
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
ServeCodec
方法的参数是一个接口 ServerCodec
,
ServerCodec为RPC会话的服务器端实现RPC请求的读取和RPC响应的写入。而gobServerCodec实现了它。还有json编码的比如go/src/net/rpc/jsonrpc/server.go:17::serverCodec
,都是实现了ServerCodec接口。
go/src/net/rpc/server.go:459
// ServeCodec is like ServeConn but uses the specified codec to// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec. wg.Wait()
codec.Close()
}
这里面其实就实现了客户端调用后,服务端处理的过程。server.readRequest
获取了客户端请求的参数以及存储服务方法的service。紧接着在下面 server.call
,调用了真实对象的方法。
go/src/net/rpc/server.go:585
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
。。。。
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
。。。
}
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
// We read the header successfully. If we see an error now,
// we can still recover and move on to the next request.
keepReading = true
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
// Look up the request.
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
svc = svci.(*service)
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
server.getRequest()
获取到request后,通过codec.ReadRequestHeader(req)
解码,这里因为例子中是gob编码,所以会调用gobServerCodec::ReadRequestHeader
解码,然后获取到客户端调用服务的对象和方法名。接下来获取客户端调用的参数,最后调用call方法实现对象方法的调用。
/go/src/net/rpc/server.go:373
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
这里通过反射call方法调用了真正的方法,返回值如果非nil,则返回一个错误的字符串,然后写入response。
客户端
客户端总共有2个方法调用call和go,call方法底层也是调用的go方法,go方法会把结果返回到call结构体的done channel上
go/src/net/rpc/client.go:298
type Client struct {
codec ClientCodec //客户端也有一个编码的接口描述,和服务端的serverCodec是一样的,请求用什么编码,解码就用什么解
reqMutex sync.Mutex // protects following
request Request
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call // 这是待处理请求的map,key是序号seq,值是call对象,call里保存了本次请求的参数等
closing bool // user has called Close
shutdown bool // server has told us to stop
}
// request 规定了service的格式
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args any // The argument to the function (*struct).
Reply any // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.}
func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
func (client *Client) Call(serviceMethod string, args any, reply any) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
这里done的容量在同步调用时done的长度设置为了1,这里不用无缓冲的channel。因为无缓冲要求发送值的时候,需要设置接收者,否则就会一直阻塞住,会有性能问题,而有缓冲的就可以缓存一个值,可以立即返回,且可以保证有序性,就是因为同一时间只能缓存一个值。另一个好处就是可以避免死锁的问题,假设没有接收者或者发送方和接收方都处于阻塞中,无缓冲的很可能立马就dead lock了。
client.send
方法是真正的调用服务的方法,看下代码
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
// 序号自增,然后存入pending中
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
func (c *gobClientCodec) WriteRequest(r *Request, body any) (err error) {
if err = c.enc.Encode(r); err != nil {
return
}
if err = c.enc.Encode(body); err != nil {
return
}
return c.encBuf.Flush()
}
client通过codec调用WriteRequest方法,WriteRequest方法就是用gob编码然后写入到底层io.writer接口等待服务端读取。
上面说到 codec 是实现了接口 ClientCodec,NewClient默认是通过gob编码的。
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
这里在创建一个新的客户端的同时,启了一个新的go程接收客户端的响应。
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
可以看到服务端处理后发送请求,ReadResponseBody
客户端这边接收到后写入call.Reply中,然后从将call结构体写入call.done
中,为了防止done阻塞,select 给了一个空的default。最后客户端处理返回的结果。