DeFiBus

DeFiBus的Rpc调用实现原理

2021-07-25  本文已影响0人  晴天哥_王志

DeFiBus的Rpc调用

Rpc调用原理图

整个调用过程包含了两个消息的产生和消费过程。

DeFiBus的Producer

public class DeFiBusProducerImpl {

    public Message request(Message requestMsg, final SendCallback sendCallback, RRCallback rrCallback, long timeout)
        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        boolean isAsyncRR = (rrCallback != null);

        final String uniqueRequestId = DeFiBusRequestIDUtil.createUniqueName("w");
        DefaultMQProducer producer = deFiBusProducer.getDefaultMQProducer();
        requestMsg.putUserProperty(DeFiBusConstant.KEY, DeFiBusConstant.PERSISTENT);
        requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID, uniqueRequestId);
        // 在请求的消息中增加了PROPERTY_MESSAGE_REPLY_TO属性
        requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, producer.buildMQClientId());
        requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));

        final RRResponseFuture responseFurture = new RRResponseFuture(rrCallback, timeout);

        String topic = requestMsg.getTopic()
        ResponseTable.getRrResponseFurtureConcurrentHashMap().put(uniqueRequestId, responseFurture);
        if (isAsyncRR) {
          // 省略代码
        } else {
            publish(requestMsg, new SendCallback() {
               // 省略相关代码
            }, timeout);
            Message retMessage = responseFurture.waitResponse(timeout);
            ResponseTable.getRrResponseFurtureConcurrentHashMap().remove(uniqueRequestId);
            return retMessage;
        }
    }


    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        // 携带producer所在的IP地址
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }
}

DeFiBus的Consumer

public class DeFiBusClientUtil {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeFiBusClientUtil.class);

    public static Message createReplyMessage(MessageExt sourceMsg, byte[] content) {
        String cluster = sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER);
        String replyTopic = DeFiBusConstant.RR_REPLY_TOPIC;
        if (!StringUtils.isEmpty(cluster)) {
            replyTopic = cluster + "-" + replyTopic;
        }

        Message msg = new Message();
        msg.setTopic(replyTopic);//回程topic
        msg.setBody(content);//body
        msg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, 
                     sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));//回给谁
        msg.putUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID, 
                    sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));//原uniqueId
        String sourceBroker = sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER);
        if (!StringUtils.isEmpty(sourceBroker)) {
            msg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER, sourceBroker);//消息从哪个broker来
        }

        return msg;
    }
}

DeFiBus的Broker

public class DeFiReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {


    private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx, //
        final RemotingCommand request, //
        final SendMessageContext sendMessageContext, //
        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

        if (msgInner.getProperties() != null && DeFiBusConstant.REPLY.equals(msgInner.getProperties().get(DeFiBusConstant.KEY))) {
            // 获取发送者的消息Id
            String senderId = msgInner.getProperties().get(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO);
            if (senderId == null) {
                // 省略相关代码
            } else {
                // 查找senderId对应的Chennel信息
                ClientChannelInfo clientChannelInfo = this.deFiBrokerController.getProducerManager().getClientChannel(senderId);
                if (clientChannelInfo == null || clientChannelInfo.getChannel() == null || !clientChannelInfo.getChannel().isActive()) {
                    // 省略相关代码
                } else {
                    Map<String, String> map = MessageDecoder.string2messageProperties(replyMessageRequestHeader.getProperties());
                    map.put(DeFiBusMessageConst.LEAVE_TIME, String.valueOf(System.currentTimeMillis()));
                    replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(map));

                    try {
                        this.deFiBrokerController.getPushReplyMessageExecutor().submit(new Runnable() {
                            @Override public void run() {
                                boolean isPushSuccess = deFiBrokerController.getDeFiBusBroker2Client()
                                   .pushRRReplyMessageToClient(clientChannelInfo.getChannel(), replyMessageRequestHeader, msgInner);
                                } 
                            }
                        });
                    } catch (RejectedExecutionException e) {
                    }
                }
            }
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读