brpc之消息处理流程
中间大约有段时间没有继续分析brpc源码,因为有些其他事情,这里分析下当client发送消息后,server收到请求分别是如何处理的,以及client收到server响应后,是如何处理的,可能这部分内容不是很多,还是主要分析下思想,可学习下,不涉及socket和网络这块实现,server的启动过程以及需要做哪些工作等流程,后期会单独分析下。
这里以baidu协议为分析情况。
初始化
在client/server启动时,会有一个函数被调用GlobalInitializeOrDie,它的主要作用如声明,初始化全局相关的功能,当然这里使用pthread_once只保证一次。
609 void GlobalInitializeOrDie() {
610 if (pthread_once(®ister_extensions_once,
611 GlobalInitializeOrDieImpl) != 0) {
612 //more code...
614 }
615 }
其中有信号相关的处理,日志,openssl相关的,名字服务,负载均衡,压缩句柄,处理协议等。
67 struct Protocol {
82 typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
83 bool read_eof, const void *arg);
84 Parse parse;
91 typedef void (*SerializeRequest)(
92 butil::IOBuf* request_buf,
93 Controller* cntl,
94 const google::protobuf::Message* request);
95 SerializeRequest serialize_request;
96
102 typedef void (*PackRequest)(
103 butil::IOBuf* iobuf_out,
104 SocketMessage** user_message_out,
105 uint64_t correlation_id,
106 const google::protobuf::MethodDescriptor* method,
107 Controller* controller,
108 const butil::IOBuf& request_buf,
109 const Authenticator* auth);
110 PackRequest pack_request;
111
118 typedef void (*ProcessRequest)(InputMessageBase* msg);
119 ProcessRequest process_request;
127 typedef void (*ProcessResponse)(InputMessageBase* msg);
128 ProcessResponse process_response;
129
135 typedef bool (*Verify)(const InputMessageBase* msg);
136 Verify verify;
137 //more code...
以上每个定义协议的处理接口类,各功能如声明和注释,后面分析具体代码时再说。
573 std::vector<Protocol> protocols;
574 ListProtocols(&protocols);
575 for (size_t i = 0; i < protocols.size(); ++i) {
576 if (protocols[i].process_response) {
577 InputMessageHandler handler;
578 // `process_response' is required at client side
579 handler.parse = protocols[i].parse;
580 handler.process = protocols[i].process_response;
581 // No need to verify at client side
582 handler.verify = NULL;
583 handler.arg = NULL;
584 handler.name = protocols[i].name;
585 if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) {
586 exit(1);
587 }
588 }
589 }
561 InputMessageHandler handler;
562 std::vector<Protocol> protocols;
563 ListProtocols(&protocols);
564 for (size_t i = 0; i < protocols.size(); ++i) {
565 if (protocols[i].process_request == NULL) {
566 // The protocol does not support server-side.
567 continue;
568 }
569 //more code...
576 // `process_request' is required at server side
577 handler.parse = protocols[i].parse;
578 handler.process = protocols[i].process_request;
579 handler.verify = protocols[i].verify;
580 handler.arg = this;
581 handler.name = protocols[i].name;
582 if (acceptor->AddHandler(handler) != 0) {
585 delete acceptor;
586 return NULL;
587 }
588 }
以上是client/server两边的注册消息处理句柄,根据process_response来决定是否要addhandler,对于client是要有的,而server是process_request
31 struct InputMessageHandler {
45 typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
46 bool read_eof, const void *arg);
47 Parse parse;
54 typedef void (*Process)(InputMessageBase* msg);
55 Process process;
56
61 typedef bool (*Verify)(const InputMessageBase* msg);
62 Verify verify;
63
64 //more code...
69 };
352 int InputMessenger::AddHandler(const InputMessageHandler& handler) {
358 BAIDU_SCOPED_LOCK(_add_handler_mutex);
359 if (NULL == _handlers) {
360 _handlers = new (std::nothrow) InputMessageHandler[_capacity];
361 if (NULL == _handlers) {
363 return -1;
364 }
365 memset(_handlers, 0, sizeof(*_handlers) * _capacity);
366 _non_protocol = false;
367 }
382 if (_handlers[index].parse == NULL) {
383 // The same protocol might be added more than twice
384 _handlers[index] = handler;
385 } else if (_handlers[index].parse != handler.parse
386 || _handlers[index].process != handler.process) {
387 //more code...
390 }
以上是添加handler至InputMessenger中,省略掉一些次要代码。
client发起请求
当client发起一次rpc调用时,会调用_serialize_request(&cntl->_request_buf, cntl, request)进行序列化body部分:
126 void SerializeRequestDefault(butil::IOBuf* buf,
127 Controller* cntl,
128 const google::protobuf::Message* request) {
129 // Check sanity of request.
130 if (!request) {
131 //error
132 }
133 if (request->GetDescriptor() == SerializedRequest::descriptor()) {
134 buf->append(((SerializedRequest*)request)->serialized_data());
135 return;
136 }
137 if (!request->IsInitialized()) {
138 //error
141 }
142 if (!SerializeAsCompressedData(*request, buf, cntl->request_compress_type())) {//压缩类型
143 //error...
146 }
147 }
94 bool SerializeAsCompressedData(const google::protobuf::Message& msg,
95 butil::IOBuf* buf, CompressType compress_type) {
96 if (compress_type == COMPRESS_TYPE_NONE) {
97 butil::IOBufAsZeroCopyOutputStream wrapper(buf);
98 return msg.SerializeToZeroCopyStream(&wrapper);
99 }
100 const CompressHandler* handler = FindCompressHandler(compress_type);
101 if (NULL != handler) {
102 return handler->Compress(msg, buf);
103 }
104 return false;
105 }
具体的压缩实现可以参考源码。奇怪的是,这里并没有发现明显的在rpc请求中设置协议,那么到server端是怎么正确解析出请求并调用正确的handler处理的呢?后面会继续分析。
接着IssueRPC,里面有对请求_pack_request,即PackRpcRequest:
624 void PackRpcRequest(butil::IOBuf* req_buf,
625 SocketMessage**,
626 uint64_t correlation_id,
627 const google::protobuf::MethodDescriptor* method,
628 Controller* cntl,
629 const butil::IOBuf& request_body,
630 const Authenticator* auth) {
631 RpcMeta meta;
638 ControllerPrivateAccessor accessor(cntl);
639 RpcRequestMeta* request_meta = meta.mutable_request();
640 if (method) {
641 request_meta->set_service_name(FLAGS_baidu_protocol_use_fullname ?
642 method->service()->full_name() :
643 method->service()->name());
644 request_meta->set_method_name(method->name());
645 meta.set_compress_type(cntl->request_compress_type());
646 }
657 meta.set_correlation_id(correlation_id);
669 // Don't use res->ByteSize() since it may be compressed
670 const size_t req_size = request_body.length();
671 const size_t attached_size = cntl->request_attachment().length();
672 if (attached_size) {
673 meta.set_attachment_size(attached_size);
674 }
681
682 SerializeRpcHeaderAndMeta(req_buf, meta, req_size + attached_size);
683 req_buf->append(request_body);
684 if (attached_size) {
685 req_buf->append(cntl->request_attachment());
686 }
687 }
以上大概实现代码,会设置meta上服务名,方法名,压缩类型,以及关联到哪个bthread,接着序列化头部和meta部分SerializeRpcHeaderAndMeta,格式12-byte header [PRPC][body_size][meta_size]。这样,client要做的事情基本完成,接着发送数据Write,这块后面再分析,之后便Join(correlation_id)。
26 message RpcMeta {
27 optional RpcRequestMeta request = 1;
28 optional RpcResponseMeta response = 2;
29 optional int32 compress_type = 3;
30 optional int64 correlation_id = 4;
31 optional int32 attachment_size = 5;
35 }
37 message RpcRequestMeta {
38 required string service_name = 1;
39 required string method_name = 2;
40 optional int64 log_id = 3;
44 }
server收到请求
在server这边,对于新连接进来调用的是OnNewConnections,而新连接上有数据时调用OnNewMessages,这里是个大循环,把注释贴上来说明设计思想:
169 void InputMessenger::OnNewMessages(Socket* m) {
170 // Notes:
171 // - If the socket has only one message, the message will be parsed and
172 // processed in this bthread. nova-pbrpc and http works in this way.
173 // - If the socket has several messages, all messages will be parsed (
174 // meaning cutting from butil::IOBuf. serializing from protobuf is part of
175 // "process") in this bthread. All messages except the last one will be
176 // processed in separate bthreads. To minimize the overhead, scheduling
177 // is batched(notice the BTHREAD_NOSIGNAL and bthread_flush).
178 // - Verify will always be called in this bthread at most once and before
179 // any process.
180 InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
181 const InputMessageHandler* handlers = messenger->_handlers;
182 int progress = Socket::PROGRESS_INIT;
183
184 // Notice that all *return* no matter successful or not will run last
185 // message, even if the socket is about to be closed. This should be
186 // OK in most cases.
187 std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
188 bool read_eof = false;
189 while (!read_eof) {
190 //more code...
201 // Read.
202 const ssize_t nr = m->DoRead(once_read);
233 while (1) {
234 size_t index = 8888;
235 ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
285 // This unique_ptr prevents msg to be lost before transfering
286 // ownership to last_msg
287 DestroyingPtr<InputMessageBase> msg(pr.message());
288 QueueMessage(last_msg.release(), &num_bthread_created,
289 m->_keytable_pool);
290 if (handlers[index].process == NULL) {
292 continue;
293 }
294 m->ReAddress(&msg->_socket);
295 m->PostponeEOF();
296 msg->_process = handlers[index].process;
297 msg->_arg = handlers[index].arg;
318 if (!m->is_read_progressive()) {
319 // Transfer ownership to last_msg
320 last_msg.reset(msg.release());
321 } else {
322 QueueMessage(msg.release(), &num_bthread_created,
323 m->_keytable_pool);
324 bthread_flush();
325 num_bthread_created = 0;
326 }
327 //more code...
336 }
由于这块逻辑比较复杂,贴上核心实现,并贴上github上的设计思想io.md:
收消息
消息”指从连接读入的有边界的二进制串,可能是来自上游client的request或来自下游server的response。brpc使用一个或多个EventDispatcher(简称为EDISP)等待任一fd发生事件。和常见的“IO线程”不同,EDISP不负责读取。IO线程的问题在于一个线程同时只能读一个fd,当多个繁忙的fd聚集在一个IO线程中时,一些读取就被延迟了。多租户、复杂分流算法,Streaming RPC等功能会加重这个问题。高负载下常见的某次读取卡顿会拖慢一个IO线程中所有fd的读取,对可用性的影响幅度较大。
由于epoll的一个bug(开发brpc时仍有)及epoll_ctl较大的开销,EDISP使用Edge triggered模式。当收到事件时,EDISP给一个原子变量加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据。在背后,EDISP把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据。而EDISP所在的bthread会被偷到另外一个pthread继续执行,这个过程即是bthread的work stealing调度。要准确理解那个原子变量的工作方式可以先阅读atomic instructions,再看Socket::StartInputEvent。这些方法使得brpc读取同一个fd时产生的竞争是wait-free的。
回到上面处理消息的流程,DoRead读到一段数据后,会调用CutInputMessage切割消息:
724 // last chosen index of the protocol as a heuristic value to avoid
725 // iterating all protocol handlers each time.
726 //int _preferred_index;
63 ParseResult InputMessenger::CutInputMessage(
64 Socket* m, size_t* index, bool read_eof) {
65 const int preferred = m->preferred_index();
66 const int max_index = (int)_max_index.load(butil::memory_order_acquire);
67 // Try preferred handler first. The preferred_index is set on last
68 // selection or by client.
69 if (preferred >= 0 && preferred <= max_index
70 && _handlers[preferred].parse != NULL) {
71 ParseResult result =
72 _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);
73 if (result.is_ok() ||
74 result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
75 *index = preferred;
76 return result;
77 }
103 for (int i = 0; i <= max_index; ++i) {
104 if (i == preferred || _handlers[i].parse == NULL) {
105 // Don't try preferred handler(already tried) or invalid handler
106 continue;
107 }
108 ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);
109 if (result.is_ok() ||
110 result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
111 m->set_preferred_index(i);
112 *index = i;
113 return result;
114 }
132 }
部分代码如上,这里一开始preferred为-1,第一次切割时会尝试哪种协议并记录下来,因为一条链接上的消息格式一般是固定的,这样后面再parse消息时,就不用for循环去尝试是哪种协议了。话说,这里有个疑问,如果在每个rpc协议请求中加上哪种协议不是没问题?可能节省一个字段?这里定位到是baidu协议。
parse实现如下:
94 ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket,
95 bool /*read_eof*/, const void*) {
96 char header_buf[12];
97 const size_t n = source->copy_to(header_buf, sizeof(header_buf));
98 if (n >= 4) {
99 void* dummy = header_buf;
100 if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") {
101 return MakeParseError(PARSE_ERROR_TRY_OTHERS);
102 }
103 } else {
104 if (memcmp(header_buf, "PRPC", n) != 0) {
105 return MakeParseError(PARSE_ERROR_TRY_OTHERS);
106 }
107 }
108 //more code...
130 source->pop_front(sizeof(header_buf));
131 MostCommonMessage* msg = MostCommonMessage::Get();
132 source->cutn(&msg->meta, meta_size);
133 source->cutn(&msg->payload, body_size - meta_size);
134 return MakeMessage(msg);
135 }
以上解析出头部,共12节字,和meta/body大小,具体的意义不再分析,每个协议都不一样,这些细节可能随着业务不一样,如果这时解析成功则表示baidu协议并mark。
消息切割完,会启动一个新的bthread处理:
285 // This unique_ptr prevents msg to be lost before transfering
286 // ownership to last_msg
287 DestroyingPtr<InputMessageBase> msg(pr.message());
288 QueueMessage(last_msg.release(), &num_bthread_created,
289 m->_keytable_pool);
296 msg->_process = handlers[index].process;
297 msg->_arg = handlers[index].arg;
328 if (num_bthread_created) {
329 bthread_flush();
330 }
146 static void QueueMessage(InputMessageBase* to_run_msg,
147 int* num_bthread_created,
148 bthread_keytable_pool_t* keytable_pool) {
149 if (!to_run_msg) {
150 return;
151 }
152 // Create bthread for last_msg. The bthread is not scheduled
153 // until bthread_flush() is called (in the worse case).
154
155 // TODO(gejun): Join threads.
156 bthread_t th;
157 bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
158 BTHREAD_ATTR_PTHREAD :
159 BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
160 tmp.keytable_pool = keytable_pool;
161 if (bthread_start_background(
162 &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
163 ++*num_bthread_created;
164 } else {
165 ProcessInputMessage(to_run_msg);
166 }
167 }
134 void* ProcessInputMessage(void* void_arg) {
135 InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
136 msg->_process(msg);
137 return NULL;
138 }
最后会调到ProcessRpcRequest,由于ProcessRpcRequest处理请求逻辑过于复杂,这里贴上关键逻辑:
304 void ProcessRpcRequest(InputMessageBase* msg_base) {
306 DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
309 const Server* server = static_cast<const Server*>(msg_base->arg());
312 RpcMeta meta;
313 if (!ParsePbFromIOBuf(&meta, msg->meta)) {
314 //error
318 }
319 const RpcRequestMeta &request_meta = meta.request();
333 std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
348 cntl->set_request_compress_type((CompressType)meta.compress_type());
411 butil::StringPiece svc_name(request_meta.service_name());
412 if (svc_name.find('.') == butil::StringPiece::npos) {
413 const Server::ServiceProperty* sp =
414 server_accessor.FindServicePropertyByName(svc_name);
415 if (NULL == sp) {
416 //error
418 break;
419 }
420 svc_name = sp->service->GetDescriptor()->full_name();
421 }
422 const Server::MethodProperty* mp =
423 server_accessor.FindMethodPropertyByFullName(
424 svc_name, request_meta.method_name());
425 if (NULL == mp) {
426 //error
429 break;
430 } else if (mp->service->GetDescriptor()
431 == BadMethodService::descriptor()) {
432 //error
436 break;
437 }
471 CompressType req_cmp_type = (CompressType)meta.compress_type();
472 req.reset(svc->GetRequestPrototype(method).New());
473 if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
474 //error
477 break;
478 }
482 google::protobuf::Closure* done = ::brpc::NewCallback<
483 int64_t, Controller*, const google::protobuf::Message*,
484 const google::protobuf::Message*, const Server*,
485 MethodStatus*, int64_t>(
486 &SendRpcResponse, meta.correlation_id(), cntl.get(),
487 req.get(), res.get(), server,
488 method_status, msg->received_us());
503 svc->CallMethod(method, cntl.release(),
504 req.release(), res.release(), done);
519 }
以上差不多是跟client相反的过程,从msg中ParsePbFromIOBuf出meta数据;并new一个Controller;接着设置一些参数(此代码省略);设置该bthread所在线程的局部存储数据;根据FindMethodPropertyByFullName找到对应的服务处理,接着ParseFromCompressedData解压请求数据:
81 bool ParseFromCompressedData(const butil::IOBuf& data,
82 google::protobuf::Message* msg,
83 CompressType compress_type) {
84 if (compress_type == COMPRESS_TYPE_NONE) {
85 return ParsePbFromIOBuf(msg, data);
86 }
87 const CompressHandler* handler = FindCompressHandler(compress_type);
88 if (NULL != handler) {
89 return handler->Decompress(data, msg);
90 }
91 return false;
92 }
接着设置个callback的done,里面做了一些其他事情,后面再分析。接着调用CallMethod,比如example/echo_c++/echo.pb.cc:
703 void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
704 ::google::protobuf::RpcController* controller,
705 const ::google::protobuf::Message* request,
706 ::google::protobuf::Message* response,
707 ::google::protobuf::Closure* done) {
708 GOOGLE_DCHECK_EQ(method->service(), protobuf_echo_2eproto::file_level_service_descriptors[0]);
709 switch(method->index()) {
710 case 0:
711 Echo(controller,
712 ::google::protobuf::down_cast<const ::example::EchoRequest*>(request),
713 ::google::protobuf::down_cast< ::example::EchoResponse*>(response),
714 done);
715 break;
716 default:
718 break;
719 }
720 }
36 class EchoServiceImpl : public EchoService {
37 public:
38 EchoServiceImpl() {};
39 virtual ~EchoServiceImpl() {};
40 virtual void Echo(google::protobuf::RpcController* cntl_base,
41 const EchoRequest* request,
42 EchoResponse* response,
43 google::protobuf::Closure* done) {
44 // This object helps you to call done->Run() in RAII style. If you need
45 // to process the request asynchronously, pass done_guard.release().
46 brpc::ClosureGuard done_guard(done);
47
48 brpc::Controller* cntl =
49 static_cast<brpc::Controller*>(cntl_base);
73 }
74 };
其中ClosureGuard通过RAII手段在超出临界区后会调用析构函数并_done->Run(),而后者的实现基本如下:
194 void Run() {
195 bool needs_delete = self_deleting_; // read in case callback deletes
196 (get_pointer(object_)->*method_)(arg1_);
197 if (needs_delete) delete this;
198 }
这里最后会调用SendRpcResponse,即响应请求。
server响应请求
138 void SendRpcResponse(int64_t correlation_id,
139 Controller* cntl,
140 const google::protobuf::Message* req,
141 const google::protobuf::Message* res,
142 const Server* server,
143 MethodStatus* method_status,
144 int64_t received_us) {
164 butil::IOBuf res_body;
168 CompressType type = cntl->response_compress_type();
169 if (res != NULL && !cntl->Failed()) {
170 if (!res->IsInitialized()) {
171 //error
174 } else if (!SerializeAsCompressedData(*res, &res_body, type)) {
175 //error
177 } else {
178 append_body = true;
179 }
180 }
196 RpcMeta meta;
197 RpcResponseMeta* response_meta = meta.mutable_response();
204 meta.set_correlation_id(correlation_id);
205 meta.set_compress_type(cntl->response_compress_type());
221 butil::IOBuf res_buf;
222 SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
223 if (append_body) {
224 res_buf.append(res_body.movable());
225 if (attached_size) {
226 res_buf.append(cntl->response_attachment().movable());
227 }
228 }
253 Socket::WriteOptions wopt;
254 wopt.ignore_eovercrowded = true;
255 if (sock->Write(&res_buf, &wopt) != 0) {
256 //error
260 return;
261 }
268 }
以上给client回包,SerializeAsCompressedData响应body数据,带上meta头部数据,跟client发请求时一样,最后Write,这部分后面再分析。
client收到响应
跟server相反,当socket可读时,即有消息到来,此时最终走process_response,即ProcessRpcResponse:
545 void ProcessRpcResponse(InputMessageBase* msg_base) {
547 DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
548 RpcMeta meta;
549 if (!ParsePbFromIOBuf(&meta, msg->meta)) {
550 //error
552 }
553
554 const bthread_id_t cid = { static_cast<uint64_t>(meta.correlation_id()) };
556 Controller* cntl = NULL;
557 const int rc = bthread_id_lock(cid, (void**)&cntl);
558 if (rc != 0) {
559 //error
564 return;
565 }
579 const RpcResponseMeta &response_meta = meta.response();
606 const CompressType res_cmp_type = (CompressType)meta.compress_type();
607 cntl->set_response_compress_type(res_cmp_type);
608 if (cntl->response()) {
609 if (!ParseFromCompressedData(
610 *res_buf_ptr, cntl->response(), res_cmp_type)) {
611 //error
615 }
616 }
46 void OnResponse(CallId id, int saved_error) {
47 const Controller::CompletionInfo info = { id, true };
48 _cntl->OnVersionedRPCReturned(info, false, saved_error);
49 }
早些时间分析过bthread_id相关的,这里bthread收到响应后会尝试bthread_id_lock住同一个rpc的处理,然后处理响应,并OnVersionedRPCReturned,其中要唤醒最初client发起rpc时的bthread。
上面有很多细节没有分析,本身细节太多可能会陷进去,这边分析整个大概流程,消息走向,有些实现后面再慢慢分析,要分析的还是挺多的。这里贴上官方一张流程图:
