RocketMQ

六、RocketMQ-Producer-Send深入一丢丢

2019-04-25  本文已影响10人  ASD_92f7

一、概述

本篇主要跟踪下producer的发送流程,以SYNC同步模式为例,假定producer已经start了

二、主线流程图

主线流程图

三、流程深入一丢丢

再次说明下,这里是以SYNC为例子
只对逻辑相对较多的几个方法做讲解

方法2:

增加了一个timeout,发送超时时间,默认时间 3 秒

SendResult send(Message msg,long timeout)

方法3:sendDefaultImpl()

方法流程图
方法内,会根据策略获取待发送的队列,然后调用sendKernelImpl发送消息,如果发送失败,会尝试 1 + 重试次数(默认为2) = 3次

方法4 sendKernelImpl()

sendKernelImpl

方法6 invokeSync

这个方法在调用 invokeSyncImpl 的前后,分别调用了doBeforeRpcHooksdoAfterRpcHooks的hooks方法,切入RPC调用

方法7 invokeSyncImpl

这个是最终和broker通讯的代码,通过netty的channel.writeAndFlush(request)方法将消息发送给broker,并通过ChannelFutureListener回调函数获取broker的反馈
通过下面的代码让阻塞线程,其实内部就是一个length=1的CountDownLatch

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

然后在ChannelFutureListener回调函数的putResponse方法中释放,latch - 1,保证获取到回馈再返回
具体的源代码如下:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            // 在这里阻塞 等待响应
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }
上一篇下一篇

猜你喜欢

热点阅读