『互联网架构』码农的世界

『互联网架构』软件架构-rocketmq之实践(62)

2019-05-27  本文已影响5人  IT人故事会

原创文章,欢迎转载。转载请注明:转载自IT人故事会,谢谢!
原文链接地址:『互联网架构』软件架构-rocketmq之实践(62)

上次完成rocketMq的安装,并测试了发送和接收,这次简单的利用rocketMq的源码连接rocketMq集群。
源码:https://github.com/limingios/netFuture/tree/master/jms

(一)broker的properties配置文件

上节直接用了我提供的properties文件,并没有详细解释每个字段的含义这次详细说下。

参数名 默认值 描述
listenPort 10911 broker的服务端口号,作为对producer和consumer使用服务的端口号
namesrvAddr null namesrv的ip地址。格式: ip:port;ip:port
brokerIP1 本机IP broker所在的机器ip,默认不用设置,如果机器有多个网卡,需要手动设置
brokerName 本机主机名 作用为一组master与slave通过brokerName是否相同来标示,通过brokerId来区分master还是slave brokerClusterName DefaultCluster 整个broker集群的名字,创建topic时需要指定。
brokerId 0 0:master 非0:slave
storePathCommitLog $HOME/store/commitlog/ commitLog存储路径
storePathConsumerQueue $HOME/store/consumequeue/ 消费队列存储路径
mapedFileSizeCommitLog 1024 * 1024 * 1024(1G) commitLog每个文件的大小,默认1G
deleteWhen 4 删除文件时间点,默认凌晨 4点
fileReservedTime 72 文件保留时间,默认72小时.
brokerRole ASYNC_MASTER Broker 的角色ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master SLAVE
flushDiskType ASYNC_FLUSH 刷盘方式 ASYNC_FLUSH 异步刷盘 SYNC_FLUSH 同步刷盘
defaultTopicQueueNums 4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。
autoCreateTopicEnable true 是否自动创建topic。
autoCreateSubscriptionGroup true 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
rejectTransactionMessage false 是否拒绝事务消息接入
fetchNamesrvAddrByAddressServer false 是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式
storePathIndex $HOME/store/index 消息索引存储路径
storeCheckpoint $HOME/store/checkpoint checkpoint文件存储路径
abortFile $HOME/store/abort abort文件存储路径
maxTransferBytesOnMessageInMemory 262144 单次Pull消息(内存)传输的最大字节数
maxTransferCountOnMessageInMemory 32 单次Pull消息(内存)传输的最大条数
maxTransferBytesOnMessageInDisk 65536 单次Pull消息(磁盘)传输的最大字节数
maxTransferCountOnMessageInDisk 8 单次Pull消息(磁盘)传输的最大条数
messageIndexEnable true 是否开启消息索引功能
messageIndexSafe false 是否提供安全的消息索引机制,索引保证不丢
haMasterAddress 在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置
cleanFileForciblyEnable true 磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除
参数名 默认值 描述
namesrvAddr Name Server地址列表,多个NameServer地址用分号隔开
clientIP 本机IP 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
instanceName DEFAULT 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads 4 通信层异步回调线程数
pollNameServerInteval 30000 轮询Name Server间隔时间,单位毫秒
heartbeatBrokerInterval 30000 向Broker发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval 5000 持久化Consumer消费进度间隔时间,单位毫秒

Producer配置

参数名 默认值 描述
producerGroup DEFAULT_PRODUCER Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组
createTopicKey TBW102 在发送消息时,自动创建服务器不存在的topic,需要指定Key。
defaultTopicQueueNums 4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
sendMsgTimeout 10000 发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送
maxMessageSize 131072 客户端限制的消息大小,超过报错,同时服务端也会限制
transactionCheckListener 事务消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池大小
checkThreadPoolMaxSize Broker回查Producer事务状态时,线程池大小
checkRequestHoldMax 2000 Broker回查Producer事务状态时,Producer本地缓冲请求队列大小

Push Consumer配置

参数名 默认值 描述
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
messageModel CLUSTERING 消息模型,支持以下两种1、集群消费(CLSUTER)2、广播消费(BROADCASTING)
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从什么位置开始消费1、CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息2、CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍3、CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
subscription {} 订阅关系
messageListener 消息监听器
offsetStore 消费进度存储
consumeThreadMin 10 消费线程池数量
consumeThreadMax 20 消费线程池数量
consumeConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度
pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数
pullInterval 0 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒
consumeMessageBatchMaxSize 1 批量消费,一次消费多少条消息
pullBatchSize 32 批量拉消息,一次最多拉多少条

Pull Consumer配置

参数名 默认值 描述
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis 20000 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒
consumerTimeoutMillisWhenSuspend 30000 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒
consumerPullTimeoutMillis 10000 非长轮询,拉消息超时时间,单位毫秒
messageModel BROADCASTING 消息模型,支持以下两种1、集群消费2、广播消费
messageQueueListener 监听队列变化
offsetStore 消费进度存储
registerTopics [] 注册的topic集合
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略

Message数据结构各个字段都可以通过get、set方式访问,例如访问topic:
msg.getTopic();
msg.setTopic("test");

字段名 默认值 必填 说明
Topic null true 线下环境不需要申请,线上环境需要申请后才能使用
Body null true 二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。
Tags null false 类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。
Keys null false 代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以再Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品ID等。
Flag 0 false 完全由应用来设置,RocketMQ不做敢于。
DelayTimeLevel 0 false 消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费。
WaitStoreMsgOK TRUE false 表示消息是否在服务器罗盘后才返回应答。

(二)源码测试

修改这2个文件:Producer 和 Consumer

Producer

Consumer

GitHub地址:https://github.com/apache/rocketmq-externals

添加192.168.89.100:9876;192.168.89.101:9876

(三)流程梳理

生产者流程

  1. 生产者首先需要设置namesrv,或者指定其他方式更新namesrv。
  2. 从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用。
  3. 从Message Queue列表中选择合适的Queue发送消息,实现负载均衡。

消费者流程

  1. namesrv告诉消费者,他从broker中获取消息。
  2. 获取完之后开始消费。

特点:master挂了之后角色不会做切换(slave不会成为master)(商用版本的这种情况不知) 然后master和slave需要制定。

发送消息 存储消息 接受消息
停掉一个namesrv 不受影响 不受影响 不受影响
停全部的namesrv 影响 不受影响 影响
停单个master broker 不受影响 受影响(很小) 不影响
停全部master broker 影响 影响 影响
停全部salve broker 不影响 不影响 不影响
恢复任意master broker 不受影响 受影响(很小) 受影响(很小)

源码:jm下有文档

PS:说了rocketmq的概念的东西,下次重点说说rocketMq在双11是如何做到的抗压,我听过一次公开课,稍后总结下,分享给各位老铁。

上一篇下一篇

猜你喜欢

热点阅读