2020-04-21springboot2.x rabbitmq

2020-04-23  本文已影响0人  江江江123

之前使用kafka只有2种模式
1.生产者消费者
2.发布订阅

而ribbitmq却有三种模式 fanout,topic,direct
即除了一对一,一对多,之外加了多对多的关系
但是这里还是只展示生产者消费者,发布订阅模式;

目标:相同子系统开多个节点,只会消费一次
不同子系统,每个系统都消费一次
思路:相同的queue bing在topic上只会消费一次
不同的queue bing在topic上,每个queue都会消费,由于之前按使用kafka总想着有没有group来设置不同系统之间的消费,陷入了误区,其实rabbitmq的queue 是group + topic 的集合,在下面的演示中可以发现。

安装

docker-compose.yml

 rabbitmq:
    image: rabbitmq:management-alpine
    container_name: sc-rabbitmq
    volumes:
      - ./data/rabbitmq:/var/lib/rabbitmq/mnesia
    networks:
      - sc-net
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
   
networks:
  sc-net:
    external: false

http://localhost:15672 登陆访问

springboot 2.x 整合

1.引入pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application配置

spring:
  application:
    name: spirng-boot-ribbit
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    template:
      exchange: SPRING-BOOT-EXCHANGE

3.测试
2个spring boot moudle 1个既可以发消息又可以收消息, 另一个只接受消息,但是同时启动2个节点
3.1生产者

@Component
public class GoodsProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    public void send(String type, String id) {
        try {
            //exchange : 此处 exchange 可随意命名,但是要保证消费者bing一致
            //key : goods 可以表示任意业务,type 表示业务行为
            //message : id即发送的消息,要求尽可能少,可以序列化后传输
            rabbitTemplate.convertAndSend("SPRING-BOOT-EXCHANGE", "goods." + type, id);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.2生产者模块下的消费者

@Component
public class GoodsReceiver {
    //queue命名携带模块信息保证不同模块重复消费
    //exchange保持一致,key满足包含,相等, #,等规则
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "PRODUCER.GOODS.ADD",durable = "true"),
    exchange = @Exchange(value = "SPRING-BOOT-EXCHANGE",type = ExchangeTypes.TOPIC),
    key = {"goods.add"}))
    public void handleAddGoods(String goodsId){

        System.out.println("1:======================"+goodsId+"============================");
    }
}

3.3 消费者模块下的消费者

@Component
public class GoodsReceiver {
    //同3.2
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "RECEIVER.GOODS.ADD",durable = "true"),
    exchange = @Exchange(value = "SPRING-BOOT-EXCHANGE",type = ExchangeTypes.TOPIC),
    key = {"goods.add"}))
    public void handleAddGoods(String goodsId){

        System.out.println("2:======================"+goodsId+"============================");
    }
}

3.4
测试用例

public class RabbitMQTest extends TestApplicationTests {

    @Autowired
    GoodsProducer goodsProducer;
    @Test
    void send(){
        for (int i = 0; i < 10; i++) {
            goodsProducer.send("add","1"); 
        }
    }
}

3.5结果
通过控制台打印可以发现,生产者模块下 打印0-9
2个消费者模块下轮询打印,各消费5个

4.最后
可以尝试和kafka一样配置:
在queue前默认添加模块名;
exchange 默认是项目名 ;
在消息发送,接受时统一对消息序列化与反序列化;
尽可能忽略消息的异常,统一通过日志中心区修复问题。

上一篇 下一篇

猜你喜欢

热点阅读