RocketMQ实战(下)
时间就像海绵里的水,只要愿挤,总还是有的。
——鲁迅
问题分析二
1.项目改造点
加入log日志
图示2.多线程安全问题
(1)多线程安全演示案例
代码示例这里票的库存有5万,5个线程,每一个线程调用一万次sale方法,sale方法每次减掉一次库存。
运行效果:
结果库存没有被减到0,这里的主要原因是多线程操作共享变量多线程不安全。
(2)项目中多线程不安全的点
在通过RocketMQ消费时进行库存扣减时会发生多线程不安全问题,原因很简单,也是跟上个例子是一样的。
图示这个地方是一个典型的多线程不安全的点。
(3)演示项目中多线程不安全
使用jmeter模拟高并发多线程,使得RocketMQ也是多线程消费,导致多线程不安全。
1、使用100个线程
图示2、模拟下单接口
图示压测前数据:Mysq中查询相关数据:
订单表为空,商品表13号goods_id的数量是573
图示 图示分布式项目启动
图示Jmeter压测跑完
图示查看数据库(shop_goods)产生了100条订单
图示但是我们查看,商品表13号goods_id的数量是556
压测前是:573,实际扣减下来,没有扣减100个。所以这里发生了多线程不安全。
图示查看日志信息,可以发现RocketMQ消费者使用了多线程进行了库存扣减。所以这里有多线程安全问题。
代码示例 结果(4)解决项目中多线程安全问题
加锁,解决多线程安全问题。
代码示例再压测一次。
图示压测前库存
图示压测后库存
图示 图示所以,这里我们需要在处理库存这里加锁,确保多线程安全。
当然处理的方式不仅仅限制于synchronized
也可以使用ReentrantLock或者使用atomi的原子操作类(基于CAS机制)
同样另外一个工程中,情况会有些不同,因为这个地方是使用优惠券,每张优惠券都是一条单条数据,一般又只有使用或者未使用的情况。
所以这里发生多线程不安全的几率极小,当然为了确保万无一失,也可以加上锁。
代码示例3.幂等性问题
因为RocketMQ没有办法确保消息不重复。
1、重试机制导致的问题,消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ。
2、消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,MQ重新发送消息到同消息组其他消费者机器,导致消息重复。
(1)项目中模拟消息重复
改造项目,在订单系统中,向RocketMQ发送两次消息,模拟消息重复。
代码示例项目重启。把订单表清空。再次确认下库存信息1000
图示 图示再次压测100条数据
图示查看数据。订单表100条数据。
图示 图示所以看到这里,发现这里有消息重复的问题。需要幂等性处理。因为RocketMQ避免不了消息重复
(2)解决方案-去重表
利用数据库表单的特性来实现幂等,常用的一个思路是在表上构建唯一性索引,保证某一类数据一旦执行完毕,后续同样的请求不再重复处理了。
当用户下单时,会经历多个环节,比如生成订单,减库存,减优惠券等等。每一个环节执行时都先检测一下该订单id是否已经执行过这一步骤,对未执行的请求,执行操作并缓存结果,而对已经执行过的id,则直接返回之前的执行结果,不做任何操作。这样可以在最大程度上避免操作的重复执行问题,缓存起来的执行结果也能用于事务的控制等。
数据表:
shop_goods_unique
图示 图示然后生成model,dao等等
图示 图示(3)项目中使用去重表解决幂等性问题
代码示例每次在操作库存时,往专属的去重表中插入一条记录,因为order_id是唯一的,如果有两次,那么第二次肯定插入不了。
所以这里就直接返回,不再修改库存。
测试:(压测前,恢复数据,订单表清空,库存设置为1000)
图示 图示压测后
图示 图示 图示这里也可以在RocketMQ中确认下应该是对应主题上有双倍的数据。
但是没有关系,有幂等性处理的话,消息重复,业务上也是正常的。
当然这里是遇到幂等性的消息是直接消费。
如果是方案要完美的话,有时候要进行分析,可以消费这类的消息,再把这类消息做为生产者写入一个“重复消息”的主题。方便对出现消息重复的地方进行问题跟踪和排查。
另外还有使用优惠券这里也可以使用另外一张去重表。因为这样就可以实现完全的分布式,订单系统和会员系统数据库也可以做一定分离。
表结构 代码示例4.压力测试
(1)异步解耦
例如,作为淘宝/天猫主站最核心的交易系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、阿里妈妈、流计算分析等等,整体业务系统庞大而且复杂,架构设计稍有不合理,将直接影响主站业务的连续性。
通过加入RocketMQ,能够解决高可用松耦合,通过上、下游业务系统的松耦合设计,即便下游子系统(如物流、积分等)出现不可用甚至宕机,都不会影响到核心交易系统的正常运转。
通过RocketMQ的异步化设计,可以灵活高效的适应因业务快速发展而带来的变化,如新增业务系统。
项目中模拟下游系统卡顿
代码示例 代码示例使用接口调用压测数据
本机压测:(5000个线程,5000个请求)
1、使用HTTP调用
http调用2、使用MQ异步解耦
MQ异步解耦明显看到,使用了MQ,压测的数据。明显好于接口调用,这个得意于MQ的异步解耦功能。
图示如果我们模拟下游系统卡顿,那么这种差别会差别很大。
代码上:在下游系统中加入2ms的休眠。
本机压测:(5000个线程,5000个请求)
1、使用HTTP调用
图示2、使用MQ异步解耦
图示经过压测可知,如果使用MQ的异步接口,其实在于上游系统(订单系统)来说,基本上不会收到下游系统的波动的影响。
无论下游系统多卡,上游系统表现已经很稳定。
所以在淘宝,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、阿里妈妈、流计算分析等等,整体业务系统庞大而且复杂,如果使用接口调用,根本就不用玩了,这就是为什么在阿里一定要用RocketMQ,不用就GG了。
5.RocketMQ的其他应用场景
(1)削峰填谷
诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的有效方式。
RocketMQ超高性能的消息处理能力可以承接流量脉冲而不被击垮,在确保系统可用性同时,因快速有效的请求响应而提升用户的体验。
确保下游业务在安全水位内平滑稳定的运行,避免超高流量的冲击。
通过削峰填谷可控制下游业务系统的集群规模,从而降低投入成本。
(2)大数据分析
数据在"流动"中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用 RocketMQ 与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。
构建应用系统和分析系统的桥梁,并将它们之间的关联解耦,同时由于数据产生非常快且数据量大,需要非常高的可扩展性。
可对接Storm/Spark实时流计算引擎,亦可对接 Hadoop/ODPS 等离线数据仓库系统。
像阿里、京东的双十一、618,显示下了多少订单,金额破了多少个亿,基本上都是基于流式计算的结果(数据还没有完全入到DB中就已经实时算出来的)。
(3)分布式缓存同步
天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过 RocketMQ 构建分布式缓存,实时通知商品数据的变化。
通过消息实时推送的方式,让数据实时得以更新。
大量并发访问商品数据库,减少页面响应时间。
大促众多分会场,多缓存的架构设计,满足对商品变更的大量访问需求。
链接: https://pan.baidu.com/s/1s_btCY2D-OiOyGdihvBleQ 提取码: gcmi
我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞、收藏和评论,我们下期见!
上一篇:RocketMQ实战(上)