阿里云企业级互联网架构实践系列(2)

2020-08-05  本文已影响0人  技术与健康

一、MQ产品概述

消息队列 RocketMQ 是阿里巴巴集团自主研发的专业消息中间件,基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询以及定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。 消息队列 RocketMQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。

二、开发规范

2.1 命名规范

Topic:

  1. 只能包含字母,数字,短横线(-)和下划线(_)。至少包含一个字母或数字。

  2. 名称长度限制在3-64字节之间,长于64字节将被自动截取。

  3. 一旦创建后不能再修改Topic名称。

  4. 名称有意义,能够体现业务场景。比如用户topic:user/user_topic

  1. 以“PID_”或者“PID-”开头,只能包含字母,数字,短横线 (-) 和下划线 (_),长度限制在7-64字节之间。

  2. 名称有意义,能够体现业务场景。比如用户topic:user/user_topic的生产者命名为PID_user/PID_user_topic。

  1. 以“CID_”或者“CID-”开头,只能包含字母,数字,短横线 (-) 和下划线 (_),长度限制在7-64字节之间。

  2. 名称有意义,能够体现业务场景。比如用户topic:user/user_topic的消费者命名为CID_user_app1/CID_user_topic_app1。App1是应用名称,由于可以多个订阅者订阅同一个topic,所以可以增加应用名称来区别。

  1. 以“GID_”或者“GID-”开头,只能包含字母,数字,短横线 (-) 和下划线 (_),长度限制在7-64字节之间。

  2. 名称有意义,能够体现业务场景。比如用户topic:user/user_topic的group命名为GID_user_app1/GID_user_topic_app1。App1是应用名称,由于可以多个订阅者订阅同一个topic,所以可以增加应用名称来区别,也可以增加业务名来区别。

2.2 代码规范

1.生产者代码:

打印消息ID:

SendResult sendResult = sendMsgUtils.sendNormalMsg("TagA", "这是一条普通消息", "这是这条信息的messageKey1");

[LOGGER.info](http://logger.info/)("发送信息的Id:" + sendResult.getMessageId());

发送完成之后立即打印消息ID,日志是排查问题的最好办法。

2.消费者代码:

打印消息ID:

**public Action consume(Message message, ConsumeContext context) {**

**try {**

[LOGGER.info](http://logger.info/)("Receive MessageId: " + message.getMsgID());

消费到信息之后立即打印消息ID,日志是排查问题的最好办法。

订阅一个topic的多个tag:

多个tag用||分割,不可以拆分成多段代码:

**consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个 Tag**

**            public Action consume(Message message, ConsumeContext context) {**

**                System.out.println("Receive: " + message);**

**                return Action.CommitMessage;**

**            }**

**        });**

** 订阅多个topic:**

需要分成多段代码,代码中的topic名称和tag名称要对应上:

**consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个 Tag**

**            public Action consume(Message message, ConsumeContext context) {**

**                System.out.println("Receive: " + message);**

**                return Action.CommitMessage;**

**            }**

**        });**

**consumer.subscribe("AnotherTopicTestMQ", "TagA||TagC", new MessageListener() { //订阅多个 Tag**

**            public Action consume(Message message, ConsumeContext context) {**

**                System.out.println("Receive: " + message);**

**                return Action.CommitMessage;**

**            }**

**        });**

三、最佳实践

3.1 XX项目

此处填写项目简单介绍。

3.1.1 审批流程(场景)

  1. 用户提交工单;

  2. 工单中心通过业务平台将数据提交到营销中心;

  3. 营销中心每次对数据处理都会发送一个状态到工单中心;

  4. 工单中心会将状态发送到mq中,用户中心订阅这个状态进行展示;

  5. 刚开始使用的是普通消息,如果省侧营销中心最后两次处理时间间隔很短,分别发送一次处理中和处理完成;

  6. 用户中心会先消费到一次处理完成,随后有收到一条处理中的消息,然后将一条已经处理完成的工单更新为处理中,状态再也不会更新了;

  7. 当时出了两个方案,A) 用户中心消费消息根据状态更新数据,如果提前更新到了数据,可以将数据置为重试数据,等状态符合的时候再消费; B) 将MQ消息改为顺序消息。

  8. 最后选择将MQ普通消息换成了顺序消息,问题解决了。

3.1.2 多订阅(场景)

  1. A中心发的消息B中心需要

  2. C中心也需要A中心的消息,但是数据和B中心的不一样,A为了不让B中心改代码,就重新构造了一份C需要的数据,发送给C

  3. D也和C一样,A就接着改,接着增加

  4. 后来有6个中心都需要A的数据,这其中可能有两个中心用的数据是一样的,或者一份数据包含他们都想要的数据,也有每个中心都有自己的特殊需求的数据。

  5. 大家觉得上面的做法会有什么问题:5.1) 如果有新的中心需要A的数据,A就得再次开发;5.2)如果A需要发送一个事务消息,这样做,就不能使用MQ的事务消息。

  6. 开发人员这个时候一直在咨询我们是否可以保障一个数据库操作和多个MQ发送同时成功,就是异步事务。当时给他们出了两个解决方案:1)使用gts,gts使用场景是步骤不易过长,而这个操作,需要操作一次数据,再顺序发送6条MQ消息,不建议使用;2)引入redis,每次为每个中心分别生成一条消息,将消息发送的redis中,消息的key是中心的名称+一个随机码,每次发出的6条信息的随机码是相同的,每次只会把这个随机码发送的MQ中,每个中心消费到这个随机码之后,通过自己的中心名+消费到的随机码,从redis中获取自己的消息;3)A只用发送一份消息,尽量全量的消息,这样,如果有新的中心加入进来,可以直接订阅A的消息。但是,问题出现了,每个中心都要改代码,其他中心都不愿意,最后经过领导协调,改成了这个样子。

3.1.3 肖峰填谷(场景)

用户订阅了电子账单,那么每个月,消息中心会把电子账单发送到用户的邮箱中;

  1. 月末的时候,省侧营销中心生成电子账单,并把电子账单发送给消息中心;

  2. 消息中心获取电子账单的时候,由于请求量比较大,消息中心怕处理不过来,就将电子账单发送到MQ中;

  3. 消息中心自己订阅电子账单的消息;

  4. 将消费到电子账单发送给用户。

3.1.4 广播消费(场景)

  1. 使用极光推送,极光推送,需要一些初始化信息,比如Secret Access Key;

  2. 第一次推送的时候,先从redis中获取这些信息,如果没有,就从数据库中获取,然后缓存在redis中,调用极光推送;

  3. 以后推送的时候,先去redis获取极光推送的缓存信息,然后直接推送;

  4. 如果用户在控制台把极光推送基本信息改变了,后台程序会先将这些信息存入数据库,然后更新redis中的信息;

  5. 我们来看一下第一步和第四步,每次从redis中获取信息之后,会初始化一个推送对象,然后执行推送,大家会发现大部分时间构建的对象是相同的,我们是否可以使用单例模式,构建一个推送对象,一直推送;应该是可以的,但是发生极光推送的基本信息在后台修改了之后会出现什么情况呢?

  6. 假设,消息中心有三个节点,在初始化的时候,每个几点都会从数据库中获取极光推送的基本信息,然后初始化一个单例类,开始发送;

  7. 这个时候,有人通过后台系统,把极光推送的基本信息改变了,如果后台系统在着三个节点上的,那么,如果在节点1上更新的,更新完成之后,可以重新初始化节点1上的单例类;但是节点2-3上的单例类没有更新;可以确认,在修改极光基本消息的时候,都是在无推送发送的时候进行的;修改了极光基本信息之后,2-3节点并不知道,我们如何触发2-3节点,让其知晓现在的极光信息;我们可以设置一个定时器,定时去数据库同步信息;如果觉得这么做麻烦,可以在推送发送错误的时候,去数据库中同步消息;我们能想到一种优雅的方式,在信息更新的时候及时的通知到每个节点吗?

  8. 三个节点在启动的时候从数据库中加载极光信息,也可以在启动的时候后台系统从数据库中读取极光信息,然后发送广播消息;

  9. 当后台系统更新极光基本信息的时候,先更新数据库,让后再发送广播消息;

  10. 每个节点在消费到广播消息的时候,重新构造自己的单例类;

  11. 当有需要推送的消息过来的时候,每个节点上的单例类都是最新的消息,可以很好的给推送提供服务。

四、FAQ

4.1 开发问题

4.1.1 MQ发送消息失败

错误信息:com.aliyun.openservices.ons.api.exception.ONSClientException</u>: Can not find name server, May be your network problem.

原因:

排查步骤:

4.1.2 MQ发送消息失败

错误信息:com.aliyun.openservices.ons.api.exception.ONSClientException: Receive a broker exception, Topi=yyglpt-delay, msgId=null, com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException: signature validate by dauth failed., com.alibaba.ons.open.auth.service.DauthServiceImpl.signatureValidate(DauthServiceImpl.java:242)

See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&broker_response_exception<u> for further details.
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.checkProd ucerException(ProducerImpl.java:198)2018-09-04 20:39:27.240</u>

原因:

排查步骤:

4.1.3 外网MQ发送消息失败

错误信息:Caused by: java.lang.NumberFormatException: For input string: at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:592)

at java.lang.Integer.valueOf(Integer.java:766)

at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.<init>(ProducerImpl.java:41)
atcom.aliyun.openservices.ons.api.impl.ONSFactoryImpl.createProducer(ONSFactoryImpl.java:30)
atcom.aliyun.openservices.ons.api.ONSFactory.createProducer(ONSFactory.java:89)
atcom.sgcc.tools.utils.producer.DelayProducerUtils.getSignleton(DelayProducerUtils.java:25)
at com.sgcc.tools.utils.SendMsgUtils.sendTimingMsg(SendMsgUtils.java:101)
at com.sgcc.goods.service.GoodsS.sendTimingMsg(GoodsS.java:174)</u>
原因:

排查步骤:

4.1.4 MQ消费消息失败

错误信息:subscription group [PID-gfy] does not existcom.aliyun.openservices.ons.api.exception.ONSClientException: Receive a broker exception, Topi=normal001, msgId=null, com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException: get user info by accesskey from PRIVATE failed., com.alibaba.ons.open.auth.service.DauthServiceImpl.signatureValidate(DauthServiceImpl.java:233)

See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&broker_response_exception for further details

at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.check ProducerException(ProducerImpl.java:198)
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send (ProducerImpl.java:116)
at com.fansmac.sc(http://com.fansmac.sc/).utils.producer.NormalProducerUtils.send(NormalProducerUtils.java:45)
at com.fansmac.sc(http://com.fansmac.sc/).utils.SendMsgUtils.sendNormalMsg(SendMsgUtils.java:63)
at com.fansmac.sc(http://com.fansmac.sc/).utils.SendMsgUtils.sendNormalMsg(SendMsgUtils.java:49)
at com.fansmac.sc(http://com.fansmac.sc/).controller.SendMsgController.sendNormal(SendMsgController.java:35)

原因:

排查步骤:

4.1.5 MQ消费消息失败

错误信息:无

原因:

排查步骤:

4.1.6 MQ消费消息失败

错误信息:无

原因:

排查步骤:

4.1.7 MQ消费消息失败

错误信息:com.aliyun.openservices.ons.api.exception.ONSClientException: Receive a broker exception, Topi=normal001, msgId=null, com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException: get user info by accesskey from PRIVATE failed., com.alibaba.ons.open.auth.service.DauthServiceImpl.signatureValidate(DauthServiceImpl.java:233)
See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&broker_response_exceptionfor further details.
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.checkProd ucerException(ProducerImpl.java:198)
at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send (ProducerImpl.java:116)
at com.fansmac.sc (http://com.fansmac.sc/).utils.producer. NormalProducerUtils.send(NormalProducerUtils.java:45)
at[com.fansmac.sc(http://com.fansmac.sc/).utils.SendMsgUtils.sendNormalMsg(SendMsgUtils.java:63)
at[com.fansmac.sc(http://com.fansmac.sc/).utils.SendMsgUtils.sendNormalMsg(SendMsgUtils.java:49)
at[com.fansmac.sc(http://com.fansmac.sc/).controller.SendMsgController.sendNormal(SendMsgController.java:35)

原因:

排查步骤:

4.1.8 MQ消费消息失败

错误信息:com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's subscription not exist[osg-sfan0001] [2018-09-14 16:38:13] See </u><u>http://rocketmq.apache.org/docs/faq/</u><u> for further details.[osg-sfan0001] [2018-09-14 16:38:13] at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:640) ~[rocketmq-client-4.1.3.jar!/:na][osg-sfan0001] [2018-09-14 16:38:13] at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.access200(MQClientAPIImpl.java:155) ~[rocketmq-client-4.1.3.jar!/:na][osg-sfan0001] [2018-09-14 16:38:13] at com.alibaba.rocketmq.client.impl.MQClientAPIImpl2.operationComplete(MQClientAPIImpl.java:592) ~[rocketmq-client-4.1.3.jar!/:na][osg-sfan0001] [2018-09-14 16:38:13] at com.alibaba.rocketmq.remoting.netty.NettyRemotingClientInvokeCallbackWrapper.operationComplete(NettyRemotingClient.java:685) [rocketmq-remoting-4.1.3.jar!/:na][osg-sfan0001] [2018-09-14 16:38:13] at com.alibaba.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51) [rocketmq-remoting-4.1.3.jar!/:na][osg-sfan0001] [2018-09-14 16:38:13] at com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract2.run(NettyRemotingAbstract.java:287) [rocketmq-remoting-4.1.3.jar!/:na][osg-sfan0001] [2018-09-14 16:38:13] at java.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:511) [na:1.8.0_65][osg-sfan0001] [2018-09-14 16:38:13] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65][osg-sfan0001] [2018-09-14 16:38:13] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65][osg-sfan0001] [2018-09-14 16:38:13] at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65][osg-sfan0001] [2018-09-14 16:38:13] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65][osg-sfan0001] 2018-09-14T08:38:13.1626034Z [osg-sfan0001] [2018-09-14 16:38:13] 2018-09-14 16:38:13.162 WARN 498 --- [ublicExecutor_4] RocketmqClient : execute the pull request exception[osg-sfan0001] 2018-09-14T08:38:13.16261187Z</u>

原因:

排查步骤:

4.1.9 MQ消费消息失败

错误信息:2018-09-18 09:42:17.499 ERROR 7948 --- [ main] com.sgcc.controller.LogController : 保存日志出现异常!
com.aliyun.openservices.ons.api.exception.ONSClientException: Can not find name server, May be your network problem.

See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist for further details.
at com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract.<init>(ONSClientAbstract.java:76)
at com.aliyun.openservices.ons.api.impl.rocketmq.ONSConsumerAbstract.<init>(ONSConsumerAbstract.java:27)
at com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl.<init>(ConsumerImpl.java:26)
atcom.aliyun.openservices.ons.api.impl.ONSFactoryImpl.createConsumer(ONSFactoryImpl.java:36)
atcom.aliyun.openservices.ons.api.ONSFactory.createConsumer(ONSFactory.java:167)
atcom.sgcc.tools.utils.consumer.NoOrderConsumerUtils.getSignleton(NoOrderConsumerUtils.java:33)
atcom.sgcc.tools.utils.ConsumerMsgUtils.consumerNormalMsg(ConsumerMsgUtils.java:51) atcom.sgcc.tools.utils.ConsumerMsgUtils.consumerNormalMsg(ConsumerMsgUtils.java:36)
at [com.sgcc.mq(http://com.sgcc.mq/)NormalConsumerRunner.run(NormalConsumerRunner.java:31) atorg.springframework.boot.SpringApplication.callRunner(SpringApplication.java:800)
atorg.springframework.boot.SpringApplication.callRunners(SpringApplication.java:784)
atorg.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:771)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at com.sgcc.DspApplication.main(DspApplication.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.taobao.pandora.boot.loader.LaunchRunner.run(LaunchRunner.java:38)
at java.lang.Thread.run(Thread.java:745)

原因:

排查步骤:

4.1.10 MQ发送消息失败

错误信息:com.aliyun.openservices.ons.api.exception.ONSClientException: Can not find name server, May be your network problem. See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist</u><u> for further details. at com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract.<init>(ONSClientAbstract.java:76) at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.<init>(ProducerImpl.java:31) at com.aliyun.openservices.ons.api.impl.ONSFactoryImpl.createProducer(ONSFactoryImpl.java:30) at com.aliyun.openservices.ons.api.ONSFactory.createProducer(ONSFactory.java:89) at com.sgcc.tools.utils.producer.NormalProducerUtils.getSignleton(NormalProducerUtils.java:26) at com.sgcc.tools.utils.SendMsgUtils.sendNormalMsg(SendMsgUtils.java:66) at com.sgcc.service.register.RegisterService.SendMq(RegisterService.java:593)

原因:

排查步骤:

4.1.11 MQ接收不到消息

错误信息:无

原因:

排查步骤:

上一篇下一篇

猜你喜欢

热点阅读