实时数据相关我爱编程Framwork

RocketMQ学习-消息发布和订阅

2018-03-31  本文已影响1998人  程序熊大

前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。

一、RocketMQ消息模型

屏幕快照 2018-03-31 14.50.41.png

在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。

producer会轮询向指定topic的mq集合发送消息。

consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。

consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。

二、消息发送

生产者Demo

首先给出代码,

package com.javadu.chapter8rocketmq.message;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

import javax.annotation.PostConstruct;

/**
 * 作用: 同步发送消息
 * User: duqi
 * Date: 2018/3/29
 * Time: 13:52
 */
@Component
public class ProducerDemo {

    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);

        defaultMQProducer.setNamesrvAddr(namesrvAddr);

        try {
            defaultMQProducer.start();

            Message message = new Message("TopicTest", "TagA",
                                          "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

            for (int i = 0; i < 100; i++) {
                SendResult sendResult = defaultMQProducer.send(message);
                System.out.println("发送消息结果, msgId:" + sendResult.getMsgId() +
                                   ", 发送状态:" + sendResult.getSendStatus());
            }

        } catch (MQClientException | UnsupportedEncodingException | InterruptedException
            | RemotingException | MQBrokerException e) {
            e.printStackTrace();
        } finally {
            defaultMQProducer.shutdown();
        }
    }

}

生产者中有两个属性:

RocketMQ中的消息,使用Message表示,代码定义如下:

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;

    public Message() {
    }
    //省略了getter和setter方法
}

每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:

public class SendResult {
    //发送状态
    private SendStatus sendStatus;
    //消息ID,用于消息去重、消息跟踪
    private String msgId;
    private MessageQueue messageQueue;
    private long queueOffset;
    //事务ID
    private String transactionId;
    private String offsetMsgId;
    private String regionId;
    //是否需要跟踪
    private boolean traceOn = true;

    public SendResult() {
    }
    //省略了构造函数、getter和setter等一系列方法
}

在这个demo中,我们是将消息内容和消息状态一并打印到控制台。

消息发送源码分析

在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。

屏幕快照 2018-03-31 11.51.36.png

首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。

MQProducer.png

在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:

 public void start(final boolean startFactory) throws MQClientException {
        //根据当前的服务状态决定接下来的动作
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                //创建一个客户端工厂
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //将生产者注册到指定producer group
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                //填充topicPublishInfoTable
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        //给该producer连接的所有broker发送心跳消息
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

顺着mQClientFactory.start()往下跟,可以进一步了解生产者的细节,主要步骤有:

  public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。

屏幕快照 2018-03-31 12.26.32.png

这里我们只看最简单的send(Message message)方法,最终在DefaultMQProducerImpl中实现:

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //确认生产者状态正常
        this.makeSureStateOK();
        //检查消息的合法性
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //获取消息的目的地:Topic信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //计算出消息的投递次数,如果是同步投递,则是1+重试次数,如果不是同步投递,则只需要投递一次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            //一个broker集群有不同的broker节点,lastBrokerName记录了上次投递的broker节点,每个broker节点
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择一个要发送的消息队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //投递消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        //根据消息发送模式,对消息发送结果做不同的处理
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

发送消息的主要过程如下:

public class TopicPublishInfo {
    //是否顺序消息
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    //维护该topic下用于的消息队列列表
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //计算下一次该投递的队列,这里应用ThreadLocal,即使是同一台机器中,每个producer实例都有自己的队列
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;

    //省略了getter和setter方法
    
    //选择指定lastBrokerName上的下一个mq
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

    //选择当前broker节点的下一个mq
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
}

二、消息消费

消费者Demo

消费者里有个属性需要看下:

消费者Demo里做了以下几个事情:

package com.javadu.chapter8rocketmq.message;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 作用:
 * User: duqi
 * Date: 2018/3/29
 * Time: 14:00
 */
@Component
public class ConsumerDemo {

    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.consumerGroup}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "TagA");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        //输出消息内容
                        System.out.println("messageExt: " + messageExt);

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        //输出消息内容
                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //稍后再试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者源码分析

前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。

MQConsumer.png

在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:

    /**
     * Subscribe a topic to consuming subscription.
     *
     * @param topic topic to subscribe.
     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
     * if null or * expression,meaning subscribe all
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
    }

第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            //构建包含订阅信息的对象,并放入负载平衡组件维护的map中,以topic为key
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            //如果已经跟broker集群建立连接,则给所有的broker节点发送心跳消息
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值

ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):

    /**
     * Register a callback to execute on message arrival for concurrent consuming.
     *
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:

参考资料

  1. 分布式开放消息系统(RocketMQ)的原理与实践
  2. 买好车提供的rocketmq-spring-boot-starter
  3. Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控
上一篇下一篇

猜你喜欢

热点阅读