RabbitMQ学习rabbitRabbitMQ

RabbitMQ笔记十二:RabbitListenerConfi

2017-10-15  本文已影响142人  二月_春风

RabbitListenerConfigurer详解

RabbitListenerConfigurer源码分析
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public RabbitListenerConfigurer rabbitListenerConfigurer(){

        return new RabbitListenerConfigurer() {
            @Override
            public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {

                //endpoint设置zhihao.miao.order队列的消息处理逻辑
                SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
                endpoint.setId("10");
                endpoint.setQueueNames("zhihao.miao.order");
                endpoint.setMessageListener(message -> {
                    System.out.println("endpoint1处理消息的逻辑");
                    System.out.println(new String(message.getBody()));
                });


                //使用适配器来处理消息,设置了order,pay队列的消息处理逻辑
                SimpleRabbitListenerEndpoint endpoint2 = new SimpleRabbitListenerEndpoint();
                endpoint2.setId("11");
                endpoint2.setQueueNames("order","pay");
                System.out.println("endpoint2处理消息的逻辑");
                endpoint2.setMessageListener(new MessageListenerAdapter(new MessageHandler()));

                //注册二个endpoint
                registrar.registerEndpoint(endpoint);
                registrar.registerEndpoint(endpoint2);
            }
        };
    }
}

消费端消息处理器

public class MessageHandler {

    public void handleMessage(byte[] message){
        System.out.println("消费消息");
        System.out.println(new String(message));
    }
}

消费端应用启动类

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(60);
        context.close();
    }
}

使用总结

上一篇 下一篇

猜你喜欢

热点阅读