微服务消息队列-RabbitMQ的简单使用

2022-02-27  本文已影响0人  侧耳倾听y

1. 下载RabbitMQ

可以使用docker镜像:
https://python.iitter.com/other/170428.html

2. 添加依赖

生产者和消费者都需要添加以下依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3. 生产者

@Service
@Slf4j
public class Producer {

    @Autowired
    private StreamBridge streamBridge;

    public void send(Object message) {
        log.info("sent: {}", message);
        streamBridge.send("testMessage-out-0", message);
    }

}

4. 消费者


@Slf4j
@Service
public class Consumer {

    @Autowired
    private ConsumerService consumerService;

    @Bean
    public Consumer<Object> testMessage() {
        return request -> {
            log.info("received: {}", request);
            consumerService.testConsumer(request);
        };
    }
}

在默认情况下,框架会使用消费者方法的 method name 作为当前消费者的标识,如果消费者标识和配置文件中的名称不一致,那么 Spring 应用就不知道该把当前的消费者绑定到哪一个 Stream 信道上去。

5. 配置文件

如果程序中只使用了单一的中间件,比如只接入了 RabbitMQ,那么可以直接在 spring.rabbitmq 节点下配置连接串,不需要特别指定 binders 配置。

spring:
  cloud:
    stream:
      # 如果你项目里只对接一个中间件,那么不用定义binders
      # 当系统要定义多个不同消息中间件的时候,使用binders定义
      binders:
        my-rabbit:
          type: rabbit # 消息中间件类型
          environment: # 连接信息
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

配置了生产者、消费者、binder 和 RabbitMQ 四方的关联关系。

spring:
  cloud:
    stream:
      bindings:
        # 添加 Producer
        testMessage-out-0:
          destination: testMessage-topic
          content-type: application/json
          binder: my-rabbit
        # Consumer
        testMessage-in-0:
          destination: testMessage-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit
      function:
        definition: testMessage

如上就可以生成和消费消息了。

6. 异常处理

消息重试是一种简单高效的异常恢复手段,当 Consumer 端抛出异常的时候,Stream 会自动执行 2 次重试。

spring:
  cloud:
    stream:
      bindings:
        testMessage-in-0:
          destination: testMessage-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: testMessage-group
          binder: my-rabbit
          consumer:
            # 如果最大尝试次数为1,即不重试
            # 默认是做3次尝试
            max-attempts: 5
            # 两次重试之间的初始间隔
            backOffInitialInterval: 2000
            # 重试最大间隔
            backOffMaxInterval: 10000
            # 每次重试后,间隔时间乘以的系数
            backOffMultiplier: 2
            # 如果某个异常你不想重试,写在这里
            retryableExceptions:
              java.lang.IllegalArgumentException: false

除了本地重试以外,还可以把这个失败的消息丢回到原始队列中,做一个 requeue 的操作。

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          # requeue重试
          testMessage-in-0:
            consumer:
              requeue-rejected: true

通过 spring-integration 的注解 @ServiceActivator 做了一个桥接,将指定 Channel 的异常错误转到本地方法里。

@ServiceActivator(inputChannel = "testMessage-topic.testMessage-group.errors")
public void requestCouponFallback(ErrorMessage errorMessage) throws Exception {
    log.info("consumer error: {}", errorMessage);
    // 实现自己的逻辑
}

如果想要保留这条出错的 Message,可以选择将它发送到另一个 Queue 里。这个特殊的 Queue 就叫做死信队列。

开启

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

配置

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          testMessage-in-0:
            consumer:
              auto-bind-dlq: true

死信队列的名称和第一个队列几乎一样,唯一区别就是末尾多了一个.dlq,这个 dlq 就是死信队列的标志。

7.延迟消息

下载rabbitmq_delayed_message_exchange插件

https://blog.csdn.net/u010375456/article/details/106323962/


// 使用延迟消息发送
public void sendInDelay(Object message) {
    log.info("sent: {}", coupon);
    streamBridge.send("testMessageDelay-out-0"
            MessageBuilder.withPayload(message)
                    .setHeader("x-delay", 10 * 1000)
                    .build());
}

@Bean
public Consumer<Object> testMessageDelay() {
    return request -> {
        log.info("received: {}", request);
        service.consumer(request);
    };
}

spring:
  cloud:
    stream:
      bindings:
        # 延迟发券 - producer
        testMessageDelay-out-0:
          destination: testMessage-delayed-topic
          content-type: application/json
          binder: my-rabbit
        # 延迟发券 - Consumer
        testMessageDelay-in-0:
          destination: testMessage-delayed-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: testMessage-group
          binder: my-rabbit
          consumer:
            # 如果最大尝试次数为1,即不重试
            # 默认是做3次尝试
            max-attempts: 1
      function:
        definition: testMessageDelay
      rabbit:
        bindings:
          testMessageDelay-out-0:
            producer:
              delayed-exchange: true
          testMessageDelay-in-0:
            consumer:
              delayed-exchange: true

上一篇 下一篇

猜你喜欢

热点阅读