RabbitMQ(五)Topic Exchange Demo

2018-05-24  本文已影响339人  隔壁丨王大爷

配置类

在application.yml中添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

pom.xml

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
      <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

TopicConfig配置类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {
    public final static String TOPICEXCHANGE = "topicExchange";
    private final static String MESSAGE = "topic.message";
    private final static String MESSAGES = "topic.#";

    @Bean
    public Queue queueMessage(){
        return new Queue(MESSAGE);
    }

    @Bean
    Queue queueMessages(){
        return new Queue(MESSAGES);
    }

    @Bean
    Queue queueMessagesx(){
        return new Queue("topic.xxx");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPICEXCHANGE);
    }

    @Bean
    public Binding bindingMessage(){
        return BindingBuilder.bind(queueMessage()).to(topicExchange()).with(MESSAGE);
    }

    @Bean
    public Binding bindingMessages(){
        return BindingBuilder.bind(queueMessages()).to(topicExchange()).with(MESSAGES);
    }
}

生产者

TopicSender

import com.gebiwangdaye.rabbitmq.config.TopicConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicSender {

    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "发送消息 ================== topic.message";
        System.out.println(context);
        this.amqpTemplate.convertAndSend(TopicConfig.TOPICEXCHANGE,"topic.message",context);
    }

    public void sendX(){
        String context = "发送消息 ================== topic.xxx";
        System.out.println(context);
        this.amqpTemplate.convertAndSend(TopicConfig.TOPICEXCHANGE,"topic.xxx",context);
    }
}

消费者

TopicReceiver

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {

    @RabbitHandler
    public void process(String context){
        System.err.println("收到消息 : topic message =====================" +context);
    }
}

TopicReceiver1

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.#")
public class TopicReceiver1 {
    @RabbitHandler
    public void process(String context){
        System.err.println("收到消息 : topic ###########   =====================" + context);
    }
}

测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

       @Autowired
    private TopicSender topicSender;
    @Test
    public void testTopicExchange(){
        for (int i = 0; i < 50;i++  ){
            topicSender.send();
            topicSender.sendX();
        }
    }

}
上一篇下一篇

猜你喜欢

热点阅读