rocketmq源码5-客户端-生产者
2019-01-27 本文已影响0人
modou1618
一 DefaultMQProducer
- 生产客户端标识
producerGroup
,事务消息中同一group的生产实例作为一个整体处理 - 默认topic
String createTopicKey = MixAll.DEFAULT_TOPIC;
String DEFAULT_TOPIC = "TBW102";
- topic创建的队列数量
int defaultTopicQueueNums=4
- 发包超时时间
int sendMsgTimeout = 3000;
- 压缩阈值,超过则压缩报文
int compressMsgBodyOverHowmuch = 1024 * 4;
- 同步发送失败,重试次数
int retryTimesWhenSendFailed = 2;
- 异步发送失败,重试次数
int retryTimesWhenSendAsyncFailed = 2;
- 发送失败时,是否更换broker
boolean retryAnotherBrokerWhenNotStoreOK = false;
- 报文长度限制
int maxMessageSize = 1024 * 1024 * 4;
二 DefaultMQProducerImpl
- 生产配置
DefaultMQProducer defaultMQProducer;
- 客户端状态
ServiceState serviceState = ServiceState.CREATE_JUST;
- 下层代理的客户端
MQClientInstance mQClientFactory;
- 发送请求和收包钩子
RPCHook rpcHook;
- 报文压缩级别,报文长度超过压缩阈值时,按压缩级别进行压缩。
int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch dose not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
- 禁止发包校验钩子函数CheckForbiddenHook
ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
- 发包前调用禁止发包校验钩子函数CheckForbiddenHook
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
- 发包前后钩子函数SendMessageHook
ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
public interface SendMessageHook {
String hookName();
void sendMessageBefore(final SendMessageContext context);
void sendMessageAfter(final SendMessageContext context);
}
- 发包前钩子函数调用
SendMessageHook.executeSendMessageHookBefore
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
- 发包后或发包产生异常时钩子函数调用
SendMessageHook.executeSendMessageHookBefore
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
- 事务校验线程池,见#三 TransactionMQProducer
BlockingQueue<Runnable> checkRequestQueue;
ExecutorService checkExecutor;
- 事务校验线程池执行的事务校验任务。获取事务校验回调接口,执行事务校验,获取校验结果。根据校验结果进行返回响应。
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
- 错误处理策略
MQFaultStrategy mqFaultStrategy
- 存储topic的存储的mq列表信息。客户端从namesrv获取并更新topic路由信息时更新。
ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable
三 TransactionMQProducer
- 事务消息生产者
- 事务校验线程池核心线程数量
int checkThreadPoolMinSize = 1;
- 事务校验线程池最大线程数量
int checkThreadPoolMaxSize = 1
- 事务校验线程池等待任务最大容量
int checkRequestHoldMax = 2000;
- 初始化事务校验线程池
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
- 事务校验回调函数,校验事务消息,返回校验结果LocalTransactionState。
事务校验线程池中调用的事务校验任务中调用。
TransactionCheckListener transactionCheckListener;
public interface TransactionCheckListener {
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}
public enum LocalTransactionState {
COMMIT_MESSAGE,//事务提交
ROLLBACK_MESSAGE,//事务回滚
UNKNOW,
}
四 MQFaultStrategy
- sendLatencyFaultEnable为true表示brokerName发送失败或超时后,需要等一段时间才能继续往该brokerName发送
boolean sendLatencyFaultEnable = false;
- latencyMax超时时间,notAvailableDuration表示broker暂停时间。使用computeNotAvailableDuration基于超时时间计算broker暂停时间。
如: 服务超时时间在550-1000ms之间,则broker暂停30000ms。
服务超时时间在50-550ms之间,则broker暂停0ms。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
- 暂停服务的broker存储结构
LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
4.4 LatencyFaultToleranceImpl
- 存储暂停服务的broker
ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
class FaultItem implements Comparable<FaultItem> {
private final String name;//broker名称
private volatile long currentLatency;//超时时间
private volatile long startTimestamp;//暂停服务时间
}
- 通过FaultItem.compareTo()选择最优的broker
public int compareTo(final FaultItem other) {
//有效的更优,即暂停服务时间已经过期
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable())
return -1;
if (other.isAvailable())
return 1;
}
//超时时间短的更优
if (this.currentLatency < other.currentLatency)
return -1;
else if (this.currentLatency > other.currentLatency) {
return 1;
}
//暂停服务时间短的更优
if (this.startTimestamp < other.startTimestamp)
return -1;
else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
五 发包路由信息TopicPublishInfo
public class TopicPublishInfo {
private boolean orderTopic = false;//是否为顺序消息topic
private boolean haveTopicRouterInfo = false;//
//可用的mq
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//当前使用的mq的索引
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
- 轮询方式选择发送的mq
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
- 排除上次发送失败的lastBrokerName后,轮询选择发送的mq
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
五 消息生产
5.1 普通消息
sendDefaultImpl()
private SendResult sendDefaultImpl(
Message msg,//消息
final CommunicationMode communicationMode,//通信模式
final SendCallback sendCallback,//异步回调
final long timeout//超时时间
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//根据topic获取topic发送路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//同步发送有多次失败重试,异步只发送一次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
//纪录本次发送的broker,成功则优先选择上次发送的broker傻姑娘的mq,失败则下次重试发送时避开
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//根据失败抑制策略,选择一个broker上的MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
//根据发送消息耗时,计算本次目标broker的抑制时间区间,抑制时间内不再向该broker发送消息。
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
//同步模式,若支持发送失败重试,则重试发送
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
//异常,则对broker进行失败抑制
//中断异常,则对broker进行超时抑制
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
//根据异常,设置返回错误码
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
//无namesrv配置错误
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
//无消息所属目标topic路由配置信息
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
5.2 顺序生产
-
sendSelectImpl()
,指定mq选择器,每次发送到固定的mq
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,//发送的MessageQueue选择器
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//选择器选择目标消息队列
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
//发送消息
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
//无topic路由信息异常
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
5.2.1 随机mq选择器
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
5.2.2 基于hash的mq选择器
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
5.3 实际发送接口
sendKernelImpl()
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//从topic路由信息中根据broker名字获取broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//更新topic路由信息,重新获取broker地址
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
//vip使能则端口号-2获取实际端口,否则是原端口号
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
//消息压缩标记
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
//事务消息预处理标记
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//钩子函数
...
//发送消息头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
//重试topic
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
//客户端接口发送消息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
5.4 事务消息
sendMessageInTransaction()
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
//标记事务消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {//发送事务消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//本地事务处理
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//失败则设置事务状态为回滚
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {//本地事务处理结果发送
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//返回事务消息结果
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
5.4.1 endTransaction
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
//事务提交 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
//事务回滚 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
//未知类型 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//发送事务结果消息到broker this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}