八、RocketMQ实践方案
1、Broker 的最佳实践
Broker Role
Broker Role有ASYNC_MASTER,SYNC_MASTER或SLAVE。如果您无法容忍消息丢失,我们建议您部署SYNC_MASTER并为其附加SLAVE。如果您容忍一少部分消息丢失,但希望Broker始终可用,则可以使用SLAVE部署ASYNC_MASTER。如果你只是想让它变得简单,你可能只需要一个没有SLAVE的ASYNC_MASTER。
FlushDiskType 刷新磁盘的类型
建议使用ASYNC_FLUSH,因为SYNC_FLUSH价格昂贵且会导致性能损失过大。如果您需要可靠性,我们建议您使用带有SLAVE的SYNC_MASTER。
2、NameServer的最佳实践
在Apache RocketMQ中,NameServer旨在协调分布式系统的每个组件,协调主要通过管理主题路由信息来实现。
管理由两部分组成:
- broker定期更新每个NameServer中保存的元数据。
- NameServer为客户端提供服务,包括生产者,消费者和命令行客户端以及最新的路由信息。
因此,在启动broker和client之前,我们需要告诉他们NameServer地址来访问 NameServer。在Apache RocketMQ中,这可以通过四种方式完成。
Programmatic Way
对于代理,我们可以namesrvAddr=name-server-ip1:port;name-server-ip2:port
在代理配置文件中指定。
对于生产者和消费者,我们可以向他们提供NameServer地址列表,如下所示:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
如果从shell使用admin命令行,也可以这样指定:
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
一个简单的例子是: sh mqadmin -n localhost:9876 clusterList
假设在NameServer节点上查询集群信息。
如果您已将管理工具集成到自己的仪表板中,则可以:
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt("please_rename_unique_group_name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
Java Options
NameServer地址列表也可以通过rocketmq.namesrv.addr
在启动之前指定续集java选项来提供给您的应用程序 。
Environment Variable
您可以导出NAMESRV_ADDR
环境变量。如果设置,broker和client将检查并使用其值。
HTTP Endpoint
如果您未使用前面提到的方法指定名称服务器地址列表,Apache RocketMQ将访问以下HTTP端点,每两分钟获取并更新名称服务器地址列表,初始延迟为10秒。
默认情况下,结束点是:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
您可以jmenv.tbsite.net
使用此Java选项覆盖:rocketmq.namesrv.domain
,您也可以nsaddr
使用此Java选项覆盖part:rocketmq.namesrv.domain.subgroup
如果您正在生产中运行Apache RocketMQ,建议使用此方法,因为它为您提供了最大的灵活性 - 您可以根据名称服务器的系统负载动态添加或删除名称服务器节点,而无需重新启动代理和客户端。
优先
首先介绍的方法优先于后者:
Programmatic Way > Java Options > Environment Variable > HTTP Endpoint
3、Producer最佳实践
SendStatus
发送消息时,您将获得包含SendStatus的SendResult。首先,我们假设Message的isWaitStoreMsgOK = true(默认为true)。如果没有,如果没有抛出异常,我们将始终获得SEND_OK。以下是每个状态的说明列表:
FLUSH_DISK_TIMEOUT
如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,您将获得此状态。
FLUSH_SLAVE_TIMEOUT
如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,则您将获得此状态。
SLAVE_NOT_AVAILABLE
如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,您将获得此状态。
SEND_OK
SEND_OK并不意味着它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。
复制或丢失
如果你得到FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT并且Broker正好关闭,你可以找到你的消息丢失。此时,您有两个选择,一个是放手,这可能会导致此消息丢失; 另一种方法是重新发送消息,这可能会使消息重复。通常我们建议重新发送并找到一种方法来处理消费时的重复删除。除非你觉得某些消息丢失并不重要。但请记住,当您获得SLAVE_NOT_AVAILABLE时,重新发送是无用的。如果发生这种情况,您应该保留场景并提醒群集管理器。
超时
客户端向Broker发送请求,并等待响应,但如果最大等待时间已过,并且未返回任何响应,则客户端将抛出RemotingTimeoutException。默认等待时间为3秒。您还可以使用send(msg,timeout)而不是send(msg)传递超时参数。请注意,我们不建议等待时间太小,因为Broker需要一些时间来刷新磁盘或与从站同步。如果该值超过syncFlushTimeout,则该值可能影响不大,因为Broker可能会在超时之前返回FLUSH_SLAVE_TIMEOUT或FLUSH_SLAVE_TIMEOUT的响应。
消息大小
我们建议消息的大小不应超过512K。
异步发送
默认发送(msg)将阻塞,直到返回响应。因此,如果您关心性能,我们建议您使用send(msg,回调),它将以异步方式运行。
Producer Group
通常,生产者组没有任何影响。但是如果存在事务,你应该注意它。默认情况下,您只能在同一个JVM中创建一个具有相同生产者组的生产者,这通常就足够了。
线程安全
生产者是线程安全的,您可以在业务解决方案中使用它。
性能
如果您希望在一个JVM中有多个生产者进行大数据处理,我们建议:
- use async sending with a few producers (3~5 is enough)
- 每个生产者的设置不同的实例名称
4、Consumer最佳实践
Consumer群体和订阅
您应该注意的第一件事是,不同的消费者组可以独立地使用相同的主题,并且每个消费者组都有自己的消费抵消。请确保同一组内的每个消费者订阅相同的主题。
消息监听
Orderly有顺序地
消费者将锁定每个MessageQueue以确保它按顺序逐个使用。这会导致性能下降,但是当您关心消息的顺序时它会很有用。不建议抛出异常,您可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。
Concurrently同时
顾名思义,Consumer将同时使用这些消息。建议使用它以获得良好的性能。不建议抛出异常,您可以返回ConsumeConcurrentlyStatus.RECONSUME_LATER。
Consume Status消费状况
对于MessageListenerConcurrently,您可以返回RECONSUME_LATER以告诉消费者您现在不能使用它并希望稍后重新生成它。然后,您可以继续使用其他消息。对于MessageListenerOrderly,因为您关心订单,所以不能跳过邮件,但是您可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT来告诉消费者等待片刻。
Blocking闭塞
不建议阻止监听器,因为它会阻塞线程池,最终可能会停止使用过程。
Thread Number线程号
使用者使用ThreadPoolExecutor在内部处理消费,因此您可以通过设置setConsumeThreadMin或setConsumeThreadMax来更改它。
ConsumeFromWhere
建立新的消费者群体时,需要决定是否需要消费已经存在于经纪人中的历史消息。CONSUME_FROM_LAST_OFFSET将忽略历史消息,并消耗之后生成的任何内容。CONSUME_FROM_FIRST_OFFSET将使用Broker中存在的每条消息。您还可以使用CONSUME_FROM_TIMESTAMP来使用在指定时间戳之后生成的消息。
重复消费问题
许多情况都可能导致重复消费问题,例如:
- 生产者重新发送消息(即,在FLUSH_SLAVE_TIMEOUT的情况下)
- 消费者关闭,一些抵消未及时更新到broker。
因此,如果您的应用程序无法容忍重复,您可能需要执行一些外部工作来处理此问题。例如,您可以检查数据库的主键。