Java

RPC入门与源码剖析

2020-01-17  本文已影响0人  Java天天


前言

RPC到底是什么?gRPC又是什么》与HTTP直接存在什么关系?

本文将讨论一下RPC相关的概念并以Python中自带的xmlrpc为例,简单剖析源码,理解它的实现原理,理解后,自己也可以轻松实现一个玩具RPC框架。

RPC概念

RPC简单定义

PRC(Remote Procedure Call)-远程过程调用,通过【网络通信】调用不同的服务,共同支撑一个软件系统,是分布式系统中的基石技术。

阅读完RPC定义,我内心的疑惑是:

1.服务之间通过API的方式走HTTP不也可以实现通过网罗通信调用不同的服务目的?

2.它与HTTP有啥差别?

3.相比于API调用的方式有何优势?

首先,RPC是一种技术思想而非一种规范或协议,RPC可以基于HTTP来实现,也可以基于其他方式,比如常见的TCP或自己定义协议通过Socker来实现。

gRPC是Google知名的RPC框架,RPC框架是对RPC技术思想的现实实现,类似于idea和相应的idea的project的关系,gRPC本身是构建于HTTP2.0上的,使用Google的Protobu协议来传输信息。

那相比API调用,它有什么优势呢?

系统内部API调用在系统间交互较少,借口不多的情况下,确实是一种有效的通信手段,它实现简单、直接,但一个大系统,子系统的交互可能很多,如果需要使用API,就需要定义非常多的接口,难以维护。

此外,使用RPC框架后,远程调用会与本地调用一样简单,底层网络传输的过程对用户而言是透明的。RPC框架会自动进行数据序列化、协议编码和网络传输等过程。

题外话:很多介绍RPC的博文会聊到系统简API调用走HTTP要进行3次握手、4次挥手,请求量大起来是很耗资源的,而RPC框架调用会维持长连接,从而减少网络开销。

这其实存在概念性错误,HTTP协议可以维持长连接,只需要在包头上添加上Connect:keep-alive,HTTP/1.0默认使用短连接,但HTTP/1.1起,默认使用的就是长连接了,HTTP是应用层协议,它的长连接和短连接,实质上是TCP协议的长连接和短连接。

gRPC使用HTTP/2.0默认就会使用长连接来传输数据。

常见RPC框架

gRPC:Google开源的RPC框架,基于HTTP/2.0实现,可支持常见的多种语言,底层使用了Netty框架。

Thrift:Facebook开源的RPC框架,一个跨语言服务开发框架。

Dubbo:阿里巴巴开源的RPC框架,协议和序列化框架都可以插拔。

RPC框架常用的通信协议:RMI、Socket、SOAP(HTTP XML)、REST(HTTP JSON)

通信框架:MINA和Netty

RPC核心功能

知名开源RPC框架代码比较复杂,具有很多功能,但RPC最核心的功能非常简单,所以这里去繁从简,只了解RPC框架最核心的功能,然后通过Python自带的xmplrpc来学习一下RPC代码层面的实现。

一个RPC框架其核心功能可以分成5个主要部分,分别是:客户端、客户端Stub、网络传输模块、服务端Stub、度无端等,之间的关系如图。

客户端(Client):服务调用方。

客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端。

服务端存根(Serve Stub):接收客户端发送过来的请求消息并进行解包,然后再调用本地服务进行处理。

服务端(Serve):服务的真正提供者。

Network Servics:地层传输,可以是TCP或HTTP.

通过Python的xmlrpc简单使用一下。加深对上面概念的理解。

首先构建一个Server端。

from xmlrpc.server import SimpleXMLRPCServer

def is_even(n):

return n % 2 == 0

server = SimpleXMLRPCServer(('127.0.0.1', 5000))

print('Listening on port 5000....')

server.register_function(is_even, 'is_even') # 注册服务

server.serve_forever()

上述代码中,使用了SimpleXMLRPCServe类构建了RPC Serve实例,然后将Server端可以被远程调用的方法通过register_function方法进行注册,这里将is_even方法注册,客户端可以无感应is_even方法,最后调用serve_forever方法让Serve端一直轮训监听5000端口。

除了使用register_function方法注册,你还可以通过register_instance方法注册类实例,此时该类下的所有方法都可以被远程Client端调用,更多用法参考文档

接着构建一个Client端

import xmlrpc.client

with xmlrpc.client.ServerProxy('http://127.0.0.1:5000/') as proxy:

res1 = proxy.is_even(3) # 调用服务端的is_even方法

print('res1: ', res1)

res2 = proxy.is_even(100)

print('res2: ', res2)

Cliet端的代码更简单,通过with上下文机制管理xmlrpc.client.ServeProxy,然后直接调用Serve端的方法,看上去很神奇。

仔细观察Serve端的输出,可以发现Client端的两次调用,其实就是两次POST请求,走的是HTTP/1.1,这说明Python的xmlrpc基于HTTP/1.1作为网络传输协议。

xmlrpc源码剖析

Client端如同调用本地方法的形式实现对Serve端方法的调用,这效果非常棒,那它是怎么实现的呢?我自己可不可以写一个呢?

剖析源码前,要思考一下从哪里入手,其实从Client端看更易理解(我一开始看Server端,绕了半天...)。

仔细观察下面的代码

with xmlrpc.client.ServerProxy('http://127.0.0.1:5000/') as proxy:

res1 = proxy.is_even(3) # 调用服务端的is_even方法

print('res1: ', res1)

res2 = proxy.is_even(100)

print('res2: ', res2)

with来操作一个类,那么这个类肯定要实现_enter_与_exit_(因为with就是对它两进行调用)。

# Lib/xmlrpc/Client/ServerProxy

def __enter__(self):

return self

def __exit__(self, *args):

self.__close()

没啥东西,观察到proxy.is_even(3),proxy就是self即ServeProxy类实例,该类本身是不存在is_even,但代码运行没有报错,那肯定重写了_getatter_。

# Lib/xmlrpc/Client/ServerProxy

# 调用属性

def __getattr__(self, name):

# magic method dispatcher

return _Method(self.__request, name)

_Method类代码如下。

# Lib/xmlrpc/Client

class _Method:

# some magic to bind an XML-RPC method to an RPC server.

# supports "nested" methods (e.g. examples.getStateName)

def __init__(self, send, name):

self.__send = send

self.__name = name

def __getattr__(self, name):

return _Method(self.__send, "%s.%s" % (self.__name, name))

def __call__(self, *args):

return self.__send(self.__name, args)

看到_Method类的_call_方法,它的最终作用就是self._request(name,args),对应到proxy.is_even(3)_,就是self._request(is_even,3),重点在_request.

# Lib/xmlrpc/Client/ServerProxy

def __request(self, methodname, params):

# call a method on the remote server

# dumps方法最终会返回一个具有XML格式的字符串

request = dumps(params, methodname, encoding=self.__encoding,

allow_none=self.__allow_none).encode(self.__encoding, 'xmlcharrefreplace')

# 请求

response = self.__transport.request(

self.__host, # self.__host, self.__handler = urllib.parse.splithost(uri),在ServerProxy类实例化时获取了对应的值

self.__handler,

request, # xml格式数据

verbose=self.__verbose # 是否显示详细的debug信息,默认为False

)

if len(response) == 1:

response = response[0]

# 返回相应结果

return response

__request方法的关键就是调用了self.__transport.request方法进行请求,该方法的调用过程__transport.request --> __transport.single_request --> __transport.send_request。

# Lib/xmlrpc/Client/ServerProxy

# 构建请求头与请求体

def send_request(self, host, handler, request_body, debug):

connection = self.make_connection(host)

headers = self._extra_headers[:]

if debug:

connection.set_debuglevel(1)

if self.accept_gzip_encoding and gzip:

connection.putrequest("POST", handler, skip_accept_encoding=True)

headers.append(("Accept-Encoding", "gzip"))

else:

connection.putrequest("POST", handler)

headers.append(("Content-Type", "text/xml"))

headers.append(("User-Agent", self.user_agent))

self.send_headers(connection, headers)

self.send_content(connection, request_body)

return connection

最终的最终,会调用python中http库下client.py中的HTTPConnection.send方法,将数据以socket的形式发送出去。

阶段性总结一下,vmlrpc的Client端,其实就是将方法名与方法参数编码转为xml格式的数据通过socket传递给Serve端(走HTTP),并在同一次HTTP连接中获取Server端对应方法返回的结果。

接着来看Server端的代码

server = SimpleXMLRPCServer(('127.0.0.1', 5000))

print('Listening on port 5000....')

server.register_function(is_even, 'is_even') # 注册服务

server.serve_forever()

SimpleXMLRPCServer类继承了socketserver.TCPServer与SimpleXMLRPCDispatcher,采用了Mixin形式,TCPServer主要负责建立连接,而这里主要关注的是SimpleXMLRPCDispatcher,该类负责XMLRPC的调度,还有一个很重要的就是SimpleXMLRPCRequestHandler类,后面会提及。

class SimpleXMLRPCServer(socketserver.TCPServer,

SimpleXMLRPCDispatcher):

def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler,

logRequests=True, allow_none=False, encoding=None,

bind_and_activate=True, use_builtin_types=False):

# ... 省略

在使用SimpleXMLRPCServe时,调用了register_function方法进行Server端方法的注册,它的本质就是将注册的方法放到dict中。

# Lib/xmlrpc

class SimpleXMLRPCDispatcher:

def __init__(self, allow_none=False, encoding=None,

use_builtin_types=False):

self.funcs = {} # 注册的方法存在在dict中

self.instance = None

def register_function(self, function, name=None):

"""Registers a function to respond to XML-RPC requests.

The optional name argument can be used to set a Unicode name

for the function.

"""

if name is None:

name = function.__name__

self.funcs[name] = function

到这里都很简单,而要清理server.serve_forever方法的调用过程就很绕了,该方法中涉及了监听Client端传递信息到解析调用本地相关方法结果返回的逻辑,为了降低阅读难度以及方便你证实,我会给出该方法的完整调用链接,而重点只分析其中比较重要的调用方法。

SimpleXMLRPCServer.serve_forever方法 --> SimpleXMLRPCServer._handle_request_noblock方法 --> SimpleXMLRPCServer.process_request方法 --> SimpleXMLRPCServer.finish_request方法

finish_request方法代码如下。

def finish_request(self, request, client_address):

"""Finish one request by instantiating RequestHandlerClass."""

self.RequestHandlerClass(request, client_address, self)

self.RequestHandlerClass在SimpleXMLRPCServer过程中被定义,默认值为SimpleXMLRPCRequestHandler类,该类没有__init__方法,但继承的BaseRequestHandler类有,继续给出调用链。

BaseRequestHandler.__init__方法 --> BaseHTTPRequestHandler.handle方法 --> BaseHTTPRequestHandler.handle_one_request方法

handle_one_request方法很关键,其部分代码如下。

# Lib/http/server.py/BaseHTTPRequestHandler

def handle_one_request(self):

try:

# ... 省略了很多条件判断

mname = 'do_' + self.command

if not hasattr(self, mname):

self.send_error(

HTTPStatus.NOT_IMPLEMENTED,

"Unsupported method (%r)" % self.command)

return

method = getattr(self, mname)

method() # 调用 do_POST() 方法

# 将获取的数据从缓冲器强制推给Client端(相当于返回数据)

self.wfile.flush() #actually send the response if not already done.

except socket.timeout as e:

#a read or a write timed out. Discard this connection

self.log_error("Request timed out: %r", e)

self.close_connection = True

return

其中最关键的就是调用了method方法,它的本质其实是调用了SimpleXMLRPCRequestHandler类的do_POST方法,该方法部分代码如下

def do_POST(self):

'''处理了POST请求'''

try:

#... 省略

# 解析请求的xml数据,其中就包含这 is_even 方法名和 3 这个参数

data = self.decode_request_content(data)

if data is None:

return #response has been sent

# 调用最关键的方法,该方法会调用Client端请求的方法并将结果返回

response = self.server._marshaled_dispatch(

data, getattr(self, '_dispatch', None), self.path

)

except Exception as e: # This should only happen if the module is buggy

self.send_header("Content-length", "0")

self.end_headers()

# ... 省略代码

最终回到了simplexmlRPCServer类的_marshaled_dispatch方法进行调度

def _marshaled_dispatch(self, data, dispatch_method = None, path = None):

try:

params, method = loads(data, use_builtin_types=self.use_builtin_types)

# generate response

if dispatch_method is not None:

response = dispatch_method(method, params)

else:

# 调用方法,即调用 is_even 方法

response = self._dispatch(method, params)

# wrap response in a singleton tuple

response = (response,)

response = dumps(response, methodresponse=1,

allow_none=self.allow_none, encoding=self.encoding)

except Fault as fault:

response = dumps(fault, allow_none=self.allow_none,

encoding=self.encoding)

except:

# ... 省略代码

return response.encode(self.encoding, 'xmlcharrefreplace')

对is_even方法的调用发送在SimpleXMLRPCDispatcher._dispatch方法中,该方法部分代码如下。

# Lib/xmlrpc/server.py/SimpleXMLRPCDispatcher

def _dispatch(self, method, params):

try:

# call the matching registered function

func = self.funcs[method]

except KeyError:

pass

else:

if func is not None:

return func(*params)

raise Exception('method "%s" is not supported' % method)

调用完Server端是is_even方法后,再将结果层层返回,最终将结果通过socket回传。

同样总结一下,Server端会以Poll机制轮训监听5000端口,有数据后,会解析到对应类的do_POST方法,然后再执行相应的调度逻辑,最终从funcs字典中找到对应的方法,然后执行获得结果,获得的结果会通过socket返回。

前期我也和很多小伙伴一样,到处收集了很多资料,后面发现很多重复的!下面都是自己整理好的!现在BAT梦想成真,我就把资料贡献出来给有需要的人!

顺便求一波关注,哈哈~各位小伙伴关注我后私信【Java】就可以免费领取哒!

出处链接:https://juejin.im/post/5e1eae506fb9a02fc90e3162

上一篇下一篇

猜你喜欢

热点阅读