6:RocketMq实战(生产者与消费者 各种实战)(扩展)(文
2021-04-20 本文已影响0人
_River_
目录
1:RocketMQ生产消息 异步发送
2:RocketMQ OneWay发送消息及多种场景对比
3:RocketMQ 延迟消息实战 定时消息 与在电商系统中应用
4: RocketMQ4.X里面的标签Tag实战和消息过滤原理
5: PushConsumer(常用)、PullConsumer消费模式分析
1:RocketMQ生产消息 异步发送
可用于需求速度非常高的时候
特别注意 最后defaultMQProducer的shutdown之前 需要先睡眠几秒钟
避免异步结果没有回来就关闭了生产者
//生产者同步发送消息
//结果同步返回
SendResult sendResult = defaultMQProducer.send(message);
log.info("RocketMQ order 消息发送结果:{}", JSON.toJSONString(sendResult));
//异步异步发送消息(需求要求速度非常高的时候 )
//结果异步返回
defaultMQProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("RocketMQ order 消息异步发送结果:{}", JSON.toJSONString(sendResult));
//发送成功可以触发其他的逻辑
}
@Override
public void onException(Throwable throwable) {
log.info("异步发送返回异常:"+throwable,throwable.getMessage());
}
});
//关闭生产者之前先睡眠几秒钟
TimeUnit.SECONDS.sleep(5);
defaultMQProducer.shutdown();
2:RocketMQ OneWay发送消息及多种场景对比
简介:讲解使用RocketMQ发送oneway消息和使用场景,多种发送模式对比
SYNC :同步(不丢失)
应用场景:重要通知邮件、报名短信通知、营销短信系统等
ASYNC :异步(不丢失)
应用场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功 后通知积分系统发放优惠券
ONEWAY :无需要等待响应 (很快 有可能丢失)
使用场景:适用于某些耗时非常短,,只负责发送消息,但对可靠性要求并不高的场景,也不做要求。
主要是日志收集,如日志服务LogServer,该服务作为消费端 通过ONEWAY发送消息过来,里面的队列可以存放消息。
//生产者同步发送消息
SendResult sendResult = defaultMQProducer.send(message);
log.info("RocketMQ order 消息发送结果:{}", JSON.toJSONString(sendResult));
//生产者同步发送OneWay消息
//没有结果的返回
defaultMQProducer.sendOneWay(message);
3:RocketMQ 延迟消息实战 定时消息 与在电商系统中应用
延迟消息:
Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,
而是推 迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息,
目前 支持固定精度的消息
代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//xxx是级别,1表示配置里面的第一个级别,2表示第 二个级别
message.setDelayTimeLevel(xxx)
定时消息:目前rocketmq开源版本还不支持,商业版本则有,两者使用场景类似
使用场景:
1:消息生产和消费有时间窗口要求:
比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。
这条消息将会在30分钟以后投递绐消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。
如没有支付则取消 如已支付则忽略
2:通过消息触发一些定时任务:
比如在某一固定时间点向用户发送提醒消息(用户会员于到期那天的9点 发送提醒消息)
4: RocketMQ4.X里面的标签Tag实战和消息过滤原理
一个Message只有一个Tag, tag是二级分类
订单:数码类订单、食品类订单
过滤分为Broker端和Consumer端过滤
Consumer端过滤:完全可以根据业务需求进行调整,但是增加了很多无用的消息传输
Broker端过滤:减少了无用的消息的进行网络传输,增加了broker的负担
Consumer端过滤 (tag(常用) 或者 sql(了解))
消息发送端
Message message = new Message(topic,tag, JSON.toJSONString(data).getBytes());
消费端
一般是监听*,
或者指定tag(tag性能高,逻辑简单)
1:defaultMQPushConsumer.subscribe(topic, "*");
2:defaultMQPushConsumer.subscribe(topic, "my_tag");
3:defaultMQPushConsumer.subscribe(topic, "my_tag_A || my_tag_B");
消息发送端:
Message message = new Message(topic,tag, JSON.toJSONString(data).getBytes());
message.putUserProperty("amount",amount)
消费端:
也可以使用SQL92(支持复杂逻辑 只支持PushConsumer中使用MessageSelector.bySql() (了解即可)
获取该Topic下 消息发送端设置了 putUserProperty "amount >5"的所有消息
defaultMQPushConsumer.subscribe(topic,MessageSelector.bySql("amount >5") )
SQL92的常见错误:
The broker does not support consumer to filter message by SQL92
解决:broker.conf里面配置如下
enablePropertyFilter=true
注意:如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列
Broker端过滤(了解)
第一次对比(hashcode):
在Broker端进行MessageTag过滤,遍历message queue存储的message tag和 Consumer订阅的tag
如果两者的hashcode不一样则跳过。符合的则传输绐Consumer。
第二次对比(tag)
为了防止 通过第一次对比的hashcode 是由于hash碰撞产生的
Consumer收到过滤消息后也会进行匹配操作 ,但是对比真实的message tag而不是hashcode。
优点和保障
1:过滤中不访问commit log(后续提及),可以高效过滤 (优点)
2:consume queue存储使用hashcode定长,节约空间 (优点 保障)
3:如果存在hash冲突,Consumer端可以进行再次确认 (保障)
5: PushConsumer(常用)、PullConsumer消费模式分析
消费端的三种重要概念:
1:Pull
2:Push
3:长轮训
消费端一共有几种处理消息的方式
1:PushConsumer 本质是长轮训
2:PullConsumer 本质是Pull
Pull(很少用)
消费端从Broker拉取消息,主动权在消费者端,可控性好;
但间隔时间不好设置:
时间间隔太短,则有可能什么都拉取不到请求,浪费资源;
时间间隔太长,则消息不能及时处理。
Push(常用)
实时性高;
增加Broker的压力,消费端能力不同,如果Push推送过快,消费端有可能会出现消息堆积。
长轮询(结合pull 与 Push 的优点):
消费端请求Broker的时候,Broker会保持当前连接一段时间默认是 15s,
如果这段时间内有消息到达,则立刻返回绐Consumer,
如果没消息的时间超过15s,则返回为空,Consumer再进行重新请求;
主动权在Consumer中(因为需要Consumer进行重新请求),Broker即使有大量的消息也不会主动提送 Consumer。
缺点:Broker需要保持Consumer的请求,会占用资源,需要消费端连接数可控 否则会一堆连接
PushConsumer本质是长轮训
DefaultMQPushConsumerImpl
系统收到消息后自动处理消息和offset(后续提及)
在broker端可以通过longPollingEnable=true来开启长轮询(默认开启)
broker端代码(RocketMQ源码):broker.longpolling
消息消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
优雅关闭:主要是释放资源和保存Offset,调用shutdown。
虽然方法名是PushConsumer,但是代码里面大量使用了pull,
因为使用长轮训方式:既有Broker的Push的实时性 又有Consumer的Pull的主动性。
Offset的存放(后续会提及):
集群消费模式存放在Broker端 广播消费模式存放在Consumer端
PullConsumer (了解)(需要自己维护Offset)
DefaultMQPullConsumer
参考官方例子 org.apache.rocketmq.example.simple.PullConsumer
1:进行MessageQueue遍历
2:每一次获取MessageQueue 的消息的时候 获取下一条消息的Offset
3:消费端维护Offset,需用本地存储Offset,存储内存、磁盘、数据库等
4:处理不同状态的消息 4种状态:
FOUND(成功)、NO_NEW_MSG(没有新消息)、 NO_MATCHED_MSG(没有匹配)、OFFSET_ILLRGL(偏移量错误)
优雅关闭:主要是释放资源和保存Offset,需用Consumer端保存好Offset,特别是异常处理的时候
灵活性高可控性强,但是编码复杂度很高
Offset的存放(后续会提及):
为什么PullConsumer必须把Offset存放在本地:
因为这样可以更方便 请求获取对Offert后 进行各种处理
项目连接
请配合项目代码食用效果更佳:
项目地址:
https://github.com/hesuijin/hesuijin-study-project
Git下载地址:
https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
rocketmq-module项目模块下
注意:因为需要修改相应配置 相关测试的代码
生产者代码主要在 单元测试
消费者代码主要在 项目代码 com.example.rocketmq.demo.consumer.Junit包下