6:RocketMq实战(生产者与消费者 各种实战)(扩展)(文

2021-04-20  本文已影响0人  _River_
目录
1:RocketMQ生产消息 异步发送
2:RocketMQ OneWay发送消息及多种场景对比
3:RocketMQ 延迟消息实战 定时消息 与在电商系统中应用

4: RocketMQ4.X里面的标签Tag实战和消息过滤原理
5: PushConsumer(常用)、PullConsumer消费模式分析
1:RocketMQ生产消息 异步发送
可用于需求速度非常高的时候
特别注意  最后defaultMQProducer的shutdown之前 需要先睡眠几秒钟
            避免异步结果没有回来就关闭了生产者

//生产者同步发送消息
//结果同步返回
SendResult sendResult = defaultMQProducer.send(message);
log.info("RocketMQ order 消息发送结果:{}", JSON.toJSONString(sendResult));


//异步异步发送消息(需求要求速度非常高的时候 )
//结果异步返回
 defaultMQProducer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.info("RocketMQ order 消息异步发送结果:{}", JSON.toJSONString(sendResult));
        //发送成功可以触发其他的逻辑
    }

    @Override
    public void onException(Throwable throwable) {
        log.info("异步发送返回异常:"+throwable,throwable.getMessage());
    }
});

//关闭生产者之前先睡眠几秒钟
TimeUnit.SECONDS.sleep(5);
defaultMQProducer.shutdown();

2:RocketMQ OneWay发送消息及多种场景对比
简介:讲解使用RocketMQ发送oneway消息和使用场景,多种发送模式对比

SYNC :同步(不丢失)
应用场景:重要通知邮件、报名短信通知、营销短信系统等

ASYNC :异步(不丢失)
应用场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功 后通知积分系统发放优惠券

ONEWAY :无需要等待响应 (很快 有可能丢失)
使用场景:适用于某些耗时非常短,,只负责发送消息,但对可靠性要求并不高的场景,也不做要求。
主要是日志收集,如日志服务LogServer,该服务作为消费端 通过ONEWAY发送消息过来,里面的队列可以存放消息。
//生产者同步发送消息
SendResult sendResult = defaultMQProducer.send(message);
log.info("RocketMQ order 消息发送结果:{}", JSON.toJSONString(sendResult));

//生产者同步发送OneWay消息
//没有结果的返回
defaultMQProducer.sendOneWay(message);

3:RocketMQ 延迟消息实战 定时消息 与在电商系统中应用
延迟消息:    
    Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,
    而是推 迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息,
    目前 支持固定精度的消息

    代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel
     "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

    //xxx是级别,1表示配置里面的第一个级别,2表示第 二个级别
    message.setDelayTimeLevel(xxx) 

定时消息:目前rocketmq开源版本还不支持,商业版本则有,两者使用场景类似

使用场景:
1:消息生产和消费有时间窗口要求:
    比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。
    这条消息将会在30分钟以后投递绐消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。
    如没有支付则取消  如已支付则忽略

2:通过消息触发一些定时任务:
    比如在某一固定时间点向用户发送提醒消息(用户会员于到期那天的9点  发送提醒消息)
4: RocketMQ4.X里面的标签Tag实战和消息过滤原理
 一个Message只有一个Tag, tag是二级分类
    订单:数码类订单、食品类订单
    过滤分为Broker端和Consumer端过滤
        Consumer端过滤:完全可以根据业务需求进行调整,但是增加了很多无用的消息传输
        Broker端过滤:减少了无用的消息的进行网络传输,增加了broker的负担

Consumer端过滤 (tag(常用) 或者 sql(了解))
    消息发送端
   Message message = new Message(topic,tag, JSON.toJSONString(data).getBytes());      

    消费端
    一般是监听*,
    或者指定tag(tag性能高,逻辑简单)    
    1:defaultMQPushConsumer.subscribe(topic, "*");
    2:defaultMQPushConsumer.subscribe(topic, "my_tag");
    3:defaultMQPushConsumer.subscribe(topic, "my_tag_A || my_tag_B");

    消息发送端:
    Message message = new Message(topic,tag, JSON.toJSONString(data).getBytes());
    message.putUserProperty("amount",amount)
   
   消费端:
   也可以使用SQL92(支持复杂逻辑  只支持PushConsumer中使用MessageSelector.bySql()  (了解即可)
   
    获取该Topic下  消息发送端设置了 putUserProperty "amount >5"的所有消息
    defaultMQPushConsumer.subscribe(topic,MessageSelector.bySql("amount >5") )

    SQL92的常见错误:
        The broker does not support consumer to filter message by SQL92
         解决:broker.conf里面配置如下
         enablePropertyFilter=true

    注意:如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列     

Broker端过滤(了解)
第一次对比(hashcode):
在Broker端进行MessageTag过滤,遍历message queue存储的message tag和 Consumer订阅的tag
如果两者的hashcode不一样则跳过。符合的则传输绐Consumer。

第二次对比(tag)
为了防止 通过第一次对比的hashcode 是由于hash碰撞产生的
Consumer收到过滤消息后也会进行匹配操作 ,但是对比真实的message tag而不是hashcode。

优点和保障
     1:过滤中不访问commit log(后续提及),可以高效过滤  (优点)
     2:consume queue存储使用hashcode定长,节约空间 (优点 保障)      
     3:如果存在hash冲突,Consumer端可以进行再次确认  (保障)
5: PushConsumer(常用)、PullConsumer消费模式分析
消费端的三种重要概念:
    1:Pull
    2:Push  
    3:长轮训

消费端一共有几种处理消息的方式 
    1:PushConsumer 本质是长轮训
    2:PullConsumer  本质是Pull

Pull(很少用)
    消费端从Broker拉取消息,主动权在消费者端,可控性好;
    但间隔时间不好设置:
        时间间隔太短,则有可能什么都拉取不到请求,浪费资源;
        时间间隔太长,则消息不能及时处理。                   

Push(常用)
    实时性高;
    增加Broker的压力,消费端能力不同,如果Push推送过快,消费端有可能会出现消息堆积。

长轮询(结合pull 与 Push 的优点):
    消费端请求Broker的时候,Broker会保持当前连接一段时间默认是 15s,
    如果这段时间内有消息到达,则立刻返回绐Consumer,
    如果没消息的时间超过15s,则返回为空,Consumer再进行重新请求;
    
    主动权在Consumer中(因为需要Consumer进行重新请求),Broker即使有大量的消息也不会主动提送 Consumer。
    
    缺点:Broker需要保持Consumer的请求,会占用资源,需要消费端连接数可控 否则会一堆连接
PushConsumer本质是长轮训
    DefaultMQPushConsumerImpl
    系统收到消息后自动处理消息和offset(后续提及)
    
    在broker端可以通过longPollingEnable=true来开启长轮询(默认开启)
    
    broker端代码(RocketMQ源码):broker.longpolling
    消息消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback    
           
    优雅关闭:主要是释放资源和保存Offset,调用shutdown。        
           
    虽然方法名是PushConsumer,但是代码里面大量使用了pull,
    因为使用长轮训方式:既有Broker的Push的实时性 又有Consumer的Pull的主动性。
 
 Offset的存放(后续会提及):
 集群消费模式存放在Broker端     广播消费模式存放在Consumer端
PullConsumer (了解)(需要自己维护Offset)
    DefaultMQPullConsumer

    参考官方例子 org.apache.rocketmq.example.simple.PullConsumer

    1:进行MessageQueue遍历
    2:每一次获取MessageQueue 的消息的时候  获取下一条消息的Offset
    3:消费端维护Offset,需用本地存储Offset,存储内存、磁盘、数据库等
    4:处理不同状态的消息 4种状态:
      FOUND(成功)、NO_NEW_MSG(没有新消息)、 NO_MATCHED_MSG(没有匹配)、OFFSET_ILLRGL(偏移量错误)
优雅关闭:主要是释放资源和保存Offset,需用Consumer端保存好Offset,特别是异常处理的时候
灵活性高可控性强,但是编码复杂度很高

Offset的存放(后续会提及):
为什么PullConsumer必须把Offset存放在本地:
因为这样可以更方便 请求获取对Offert后  进行各种处理

项目连接

请配合项目代码食用效果更佳:
项目地址:
https://github.com/hesuijin/hesuijin-study-project
Git下载地址:
https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git

rocketmq-module项目模块下 
注意:因为需要修改相应配置 相关测试的代码
    生产者代码主要在  单元测试 
    消费者代码主要在  项目代码 com.example.rocketmq.demo.consumer.Junit包下
上一篇下一篇

猜你喜欢

热点阅读