六、RocketMQ-Producer-Send深入一丢丢
2019-04-25 本文已影响10人
ASD_92f7
一、概述
本篇主要跟踪下producer的发送流程,以SYNC同步模式为例,假定producer已经start了
二、主线流程图
主线流程图三、流程深入一丢丢
再次说明下,这里是以SYNC为例子
只对逻辑相对较多的几个方法做讲解
方法2:
增加了一个timeout,发送超时时间,默认时间 3 秒
SendResult send(Message msg,long timeout)
方法3:sendDefaultImpl()
方法流程图方法内,会根据策略获取待发送的队列,然后调用sendKernelImpl发送消息,如果发送失败,会尝试 1 + 重试次数(默认为2) = 3次
方法4 sendKernelImpl()
sendKernelImpl- 首先为消息添加主键,格式如下:
UNIQ_KEY : 0BCDF1716BEC18B4AAC27F26B89A0000 - 压缩消息
- 执行hook的before方法(如果有的话)
- 组织requestHeader作为下个方法的参数
方法6 invokeSync
这个方法在调用 invokeSyncImpl 的前后,分别调用了doBeforeRpcHooks及doAfterRpcHooks的hooks方法,切入RPC调用
方法7 invokeSyncImpl
这个是最终和broker通讯的代码,通过netty的channel.writeAndFlush(request)方法将消息发送给broker,并通过ChannelFutureListener回调函数获取broker的反馈
通过下面的代码让阻塞线程,其实内部就是一个length=1的CountDownLatch
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
然后在ChannelFutureListener回调函数的putResponse方法中释放,latch - 1,保证获取到回馈再返回
具体的源代码如下:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 在这里阻塞 等待响应
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}