四、RocketMQ-Producer的Send方法
2019-04-23 本文已影响57人
ASD_92f7
一、概述
RocketMQ的producer默认有两个,一个是DefaultMQProducer,另一个是TransactionMQProducer,本文只对send方法做一个总结,其他的细节在其他章节介绍
二、DefaultMQProducer
一共定义了17种send方法,从4.x版本,事务消息被放到了TransactionMQProducer中,所以有15个send方法,这15个方法中,又有两个异步带超时时间的send方法被废弃了,所以有效的send方法有13个:
1、同步发送
/**
* 同步发送模式. 只有消息被成功接收并且被固化完成后才会收到反馈。
* 内置有重发机制, producer将会重试
* {@link #retryTimesWhenSendFailed,default=2} 次 ,然后才会报错.
* 因此,有一定的概率向broker发送重复的消息
* 使用者有责任去解决潜在的重复数据造成的影响
* @param msg 待发送数据
* @return {@link SendResult} 实体,来通知发送者发送状态等信息, 比如消息的ID
* {@link SendStatus} 指明 broker 存储/复制 的状态, 发送到了哪个队列等等
* @throws MQClientException 客户端异常
* @throws RemotingException 网络连接异常
* @throws MQBrokerException broker异常
* @throws InterruptedException 发送线程中断异常
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
2、同步发送,带超时时间
/**
* 与 {@link #send(Message)} 相同,只不过多了超时时间的指定.
*
* @param msg 待发送消息
* @param timeout 发送超时时间
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
* @throws MQClientException 客户端异常
* @throws RemotingException 网络连接异常
* @throws MQBrokerException broker异常
* @throws InterruptedException 发送线程中断异常
*/
@Override
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
3、异步发送
/**
* 异步发送消息
* 消息发送后,立即返回。broker处理返程后, 触发sendCallback回调方法
* 与上面一样,在给出发送失败标志前,会尝试2次,所以开发者要处理重复发送带来的问题
* @param msg 待发送消息
* @param sendCallback 回调函数
* @throws MQClientException 客户端异常
* @throws RemotingException 网络异常
* @throws InterruptedException 发送线程中断异常
*/
@Override
public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
4、异步发送,带超时时间
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
5、单向发送,不等待broker回馈
/**
* 发送方法不会等待broker的反馈,只会一直发
* 所以有很高的吞吐量,但是有一定概率丢失消息
*
* @param msg 待发送消息
* @throws MQClientException 客户端异常
* @throws RemotingException 网络异常
* @throws InterruptedException 发送线程中断异常
*/
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg);
}
6、同步发送,指定队列
/**
* 同步发送,指定队列
* @param msg 待发送消息
* @param mq 指定的消息队列
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
7、同步发送,指定队列,并附带超时时间
@Override
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq, timeout);
}
8、异步发送,指定队列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}
9、异步发送,指定队列,附带超时时间
这个在4.4.0版本被设置为废弃,后续版本会给出
/**
* 因为在处超时异常存在问题,所以废弃
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
}
10、单向发送,指定队列
@Override
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, mq);
}
11、同步发送,指定队列选择策略
官方的有序消息的DEMO就是基于队列选择器做的,让一些列有序的消息(相同ID)发送到同一个队列
/**
* 指定队列选择策略MessageQueueSelector
*
* @param msg 待发送消息
* @param selector 队列选择器
* @param arg 配合队列选择器选择队列的参数,一般可以是业务参数(ID等)
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
12、同步发送消息,指定队列选择策略,并附带超时时间
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
}
13、异步发送消息,指定队列选择策略
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
}
14、异步发送消息,指定队列选择策略,并附带超时时间
这个方法在4.4.0版本废弃,后续提供
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
15、单向发送,指定队列选择策略
@Override
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
三、TransactionMQProducer
1、发送事务消息
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}