RocketMQ

四、RocketMQ-Producer的Send方法

2019-04-23  本文已影响57人  ASD_92f7

一、概述

RocketMQ的producer默认有两个,一个是DefaultMQProducer,另一个是TransactionMQProducer,本文只对send方法做一个总结,其他的细节在其他章节介绍

二、DefaultMQProducer

一共定义了17种send方法,从4.x版本,事务消息被放到了TransactionMQProducer中,所以有15个send方法,这15个方法中,又有两个异步带超时时间的send方法被废弃了,所以有效的send方法有13个:

1、同步发送

/**
     * 同步发送模式. 只有消息被成功接收并且被固化完成后才会收到反馈。
     * 内置有重发机制, producer将会重试
     * {@link #retryTimesWhenSendFailed,default=2} 次 ,然后才会报错. 
     * 因此,有一定的概率向broker发送重复的消息
     * 使用者有责任去解决潜在的重复数据造成的影响
     * @param msg 待发送数据
     * @return {@link SendResult} 实体,来通知发送者发送状态等信息, 比如消息的ID
     * {@link SendStatus} 指明 broker 存储/复制 的状态, 发送到了哪个队列等等
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络连接异常
     * @throws MQBrokerException broker异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }

2、同步发送,带超时时间

/**
     * 与 {@link #send(Message)} 相同,只不过多了超时时间的指定.
     *
     * @param msg 待发送消息
     * @param timeout 发送超时时间
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络连接异常
     * @throws MQBrokerException broker异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }

3、异步发送

/**
     * 异步发送消息
     * 消息发送后,立即返回。broker处理返程后, 触发sendCallback回调方法
     * 与上面一样,在给出发送失败标志前,会尝试2次,所以开发者要处理重复发送带来的问题
     * @param msg 待发送消息
     * @param sendCallback 回调函数
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

4、异步发送,带超时时间

@Override
    public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
    }

5、单向发送,不等待broker回馈

/**
     * 发送方法不会等待broker的反馈,只会一直发
     * 所以有很高的吞吐量,但是有一定概率丢失消息
     *
     * @param msg 待发送消息
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg);
    }

6、同步发送,指定队列

/**
     * 同步发送,指定队列
     * @param msg 待发送消息
     * @param mq 指定的消息队列
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     */
    @Override
    public SendResult send(Message msg, MessageQueue mq)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq);
    }

7、同步发送,指定队列,并附带超时时间

@Override
    public SendResult send(Message msg, MessageQueue mq, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq, timeout);
    }

8、异步发送,指定队列

@Override
    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, mq, sendCallback);
    }

9、异步发送,指定队列,附带超时时间

这个在4.4.0版本被设置为废弃,后续版本会给出

/**
  * 因为在处超时异常存在问题,所以废弃
 */
@Override
    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
    }

10、单向发送,指定队列

@Override
    public void sendOneway(Message msg,
        MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg, mq);
    }

11、同步发送,指定队列选择策略

官方的有序消息的DEMO就是基于队列选择器做的,让一些列有序的消息(相同ID)发送到同一个队列

/**
     * 指定队列选择策略MessageQueueSelector 
     *
     * @param msg 待发送消息
     * @param selector 队列选择器
     * @param arg 配合队列选择器选择队列的参数,一般可以是业务参数(ID等)
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     */
@Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }

12、同步发送消息,指定队列选择策略,并附带超时时间

@Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
    }

13、异步发送消息,指定队列选择策略

@Override
    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
    }

14、异步发送消息,指定队列选择策略,并附带超时时间

这个方法在4.4.0版本废弃,后续提供

@Override
    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
    }

15、单向发送,指定队列选择策略

 @Override
    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
    }

三、TransactionMQProducer

1、发送事务消息

@Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
上一篇下一篇

猜你喜欢

热点阅读