Sprin Cloud Stream rabbit实战

2022-10-12  本文已影响0人  Mr培
架构图
Binder
springboot整合stream之生产者
  1. 加依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
  1. 写注解,在启动类上加
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class StreamProducerMain {
    public static void main(String[] args){
        SpringApplication.run(StreamProducerMain.class,args);
    }
}
  1. 写配置
server:
  port: 8081
spring:
  application:
    name: cloud-stream-producer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: xypspExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  1. 测试方法
/**
 * @author rp
 */
@AllArgsConstructor
@RestController
@RequestMapping("/message")
public class MessageProviderController {


    private final MessageProviderServer messageProviderServer;


    /**
     * 发送消息
     * */
    @GetMapping("/sendMessage")
    public Result sendMessage(){
        String uuid = messageProviderServer.send();
        return Result.success(uuid);
    }
}

package com.xypsp.springcloud.server;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @author rp
 * 定义消息的推送管道
 */
@EnableBinding(Source.class)
public class MessageProviderServer {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    public String send() {
        String serial = UUID.randomUUID().toString();
        //发送延迟消息
//        output.send(MessageBuilder.withPayload(serial).setHeader("x-delay",5000).build());
        output.send(MessageBuilder.withPayload(serial).build());
        return serial;
    }

}

springboot整合stream之消费者A
  1. 加依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
  1. 写配置
server:
  port: 8082
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: xypspExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: xypspGroup # 消息的持久化 Consumer断开连接,队列仍然存在,相同group的消费同一个queue的时候是轮询的方式,每个实例一条轮着消费,避免重复消费。

  1. 消费方法
package com.xypsp.springcloud.server;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author rp
 * 定义消息的接收管道
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        log.info("接收到消息: {}",message.getPayload());
    }
}

springboot整合stream之消费者B
  1. 加依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
  1. 写配置
server:
  port: 8083
spring:
  application:
    name: cloud-stream-consumer-b
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: xypspExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: xypspGroup # 消息的持久化 Consumer断开连接,队列仍然存在,相同group的消费同一个queue的时候是轮询的方式,每个实例一条轮着消费,避免重复消费。

  1. 消费方法
package com.xypsp.springcloud.server;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author rp
 * 定义消息的接收管道
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        log.info("接收到消息: {}",message.getPayload());
    }
}

上一篇下一篇

猜你喜欢

热点阅读