Spring Boot实践记录

SpringBoot集成Rabbit使用TopicRabbit指

2017-04-26  本文已影响59人  Chinesszz

Rabbitmq中绑定

exchange:flow

routing-key:user

bind-queue:flow_user

白话文就是,把user绑定到flow_user序列

发送方使用routing-key推送:

//把routing-key发送给名为flow的exchenge,然后exchenge负责向绑定的这个Queue推送
 amqpTemplate.convertAndSend("flow","user", context);

Rabbit配置

添加exchange添加exchange 添加Queue添加Queue

SpringBoot集成Rabbitmq

@Configurable
public class TopicRabbitConfig {
    public final static String FLOW = "flow";
    
    public final static String USER = "user";
    public final static String USER_QUEUE = "flow_user";

   
    @Bean
    public Queue queueMessages3() {
        return new Queue(USER_QUEUE);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(FLOW);
    }
    @Bean
    Binding bindingExchangeMessages3(Queue queueMessages3, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages3).to(exchange).with(FLOW);
    }
}
/**
 * @Package: pterosaur.account.service.impl
 * @Description: 模拟发送消息,测试使用
 * @author: liuxin
 * @date: 17/4/19 下午3:17
 */
@Component
public class AccountSentImpl {
    @Autowired
    private AmqpTemplate amqpTemplate;

    private ExecutorService threadPool = Executors.newFixedThreadPool(8);

    public void send() {
       for (int i=0;i<10;i++){
           String context = "hello :" + DateUtil.formatDatetime(System.currentTimeMillis())+",当前线程:"+Thread.currentThread().getName();
           System.out.println("Sender : " + context);
           threadPool.execute(new Runnable() {
               @Override
               public void run() {
                   amqpTemplate.convertAndSend(TopicRabbitConfig.FLOW,TopicRabbitConfig.USER, context);
               }
           });
       }
    }

}
/**
 * @Package: pterosaur.account.service.impl
 * @Description: mq信息处理实现类
 * @author: liuxin
 * @date: 17/4/19 下午2:55
 */
@Component
public class AccountReceiverImpl implements AccountReceiver {
    private static final Logger logger = LoggerFactory.getLogger(AccountReceiverImpl.class);

    @Autowired
    ExecutorService threadPool;


    /**
     * 用户流水
     *
     * @param message
     */
    @RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
    @RabbitHandler
    public void processUser(String message) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("用户侧流水:{}",message);
            }
        });
    }


}

Sender : hello :2017-04-25 17:44:15,当前线程:main
Sender : hello :2017-04-25 17:44:20,当前线程:main
2017-04-25 17:44:25.754  INFO 67685 --- [pool-1-thread-1] p.a.service.impl.AccountReceiverImpl     : 用户侧流水:hello :2017-04-25 17:44:20,当前线程:main
Sender : hello :2017-04-25 17:44:25,当前线程:main
Sender : hello :2017-04-25 17:44:30,当前线程:main
2017-04-25 17:44:32.048  INFO 67685 --- [pool-1-thread-2] p.a.service.impl.AccountReceiverImpl     : 用户侧流水:hello :2017-04-25 17:44:30,当前线程:main
Sender : hello :2017-04-25 17:44:32,当前线程:main
Sender : hello :2017-04-25 17:44:33,当前线程:main
2017-04-25 17:44:35.556  INFO 67685 --- [pool-1-thread-3] p.a.service.impl.AccountReceiverImpl     : 用户侧流水:hello :2017-04-25 17:44:33,当前线程:main
Sender : hello :2017-04-25 17:44:35,当前线程:main
Sender : hello :2017-04-25 17:44:37,当前线程:main
2017-04-25 17:44:38.797  INFO 67685 --- [pool-1-thread-1] p.a.service.impl.AccountReceiverImpl     : 用户侧流水:hello :2017-04-25 17:44:37,当前线程:main

上一篇 下一篇

猜你喜欢

热点阅读