RabbitMQ(四)Fanout Exchange Demo

2018-05-24  本文已影响10人  隔壁丨王大爷

配置类

在application.yml中添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

pom.xml

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
      <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

FanoutConfig配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    public static String FANOUTEXCHANGE = "fanoutExchange";

    @Bean
    public Queue AMessage(){
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage(){
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage(){
        return new Queue("fanout.C");
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUTEXCHANGE);
    }

    @Bean
    public Binding bindingA(){
        return BindingBuilder.bind(AMessage()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(BMessage()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingC(){
        return BindingBuilder.bind(CMessage()).to(fanoutExchange());
    }
}

生产者

FanoutSender

import com.gebiwangdaye.rabbitmq.config.FanoutConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
        String context = "发送消息 ================== fanout";
        System.out.println(context);
        this.amqpTemplate.convertAndSend(FanoutConfig.FANOUTEXCHANGE,"",context);
    }
}

消费者

FanoutReceiver

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("接收到消息A : " + hello);
    }
}

FanoutReceiverB

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("接收到消息B : " + hello);
    }
}

测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

        @Autowired
    private FanoutSender fanoutSender;
    @Test
    public void testFanoutExchnage(){
        for (int i =0;i <100;i++){
            fanoutSender.send();;
        }
    }

}
上一篇下一篇

猜你喜欢

热点阅读