SpringBoot程序员

SpringBoot 与 RabbitMQ

2017-09-14  本文已影响102人  纯正it狗

简介

RabbitMQ是一款开源的消息队列中间件, 使用Advanced Message Queuing Protocol (AMQP) 为协议来处理队列消息。由于当下分布式系统越来越普及,消息队列中间件也越来越被得到使用。

和RabbitMQ同类型的消息中间件也有

这篇文章暂时不分析几种MQ的比较,主要是写一些SpringBoot与RabbitMQ的使用

Ubuntu 上简单安装 RabbitMQ

首先找一台Linux服务器.

这里我用的是Ubuntu 16.04(ubuntu-xenial)

添加下载信息

> echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
> curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
> apt-get update

安装RabbitMQ

> apt-get install rabbitmq-server

安装完后咋们看一下RabbitMQ 有没有跑起来

> service rabbitmq-server status

如果跑起来的话会显示RabbitMQ的基本信息,没有的话运行

> service rabbitmq-server start

接下来我们需要一个后台的控制页面来管理我们的RabbitMQ服务, RabbitMQ其实自带了一个管理后台,所以直接激活一下就行了

> rabbitmq-plugins enable rabbitmq_management

接下来我们需要创建一个可以登录的用户,刚开始可以创建一个admin管理员,并给这个用户最高权限

> rabbitmqctl add_user [username] [password]
> rabbitmqctl set_user_tags [username] administrator
> rabbitmqctl set_permissions -p "/" [username] ".*" ".*" ".*"

当用户创建完毕后访问 http://[你的服务器IP]:15672 , 然后会出现登录画面,用刚才创建的用户登录即可

RabbitMQ 的简单概念与使用

首先看一下面的模型,简单了来讲就是Publisher将消息发送到Exchange上,然后Exchange通过Routes的健值来发布到Queue里,然后Consumer去Queue里消费消息.

RabbitMQ消息队列模型图

这边有三种常用的Exchange的方式

下面就对上面三种方式我们来进行一下代码的实现

Direct Exchange

Direct Exchange 是RabbitMQ 默认的交换形式,从图上可以看出, 消息发送方直接将带有Routing Key: green 这个参数发送到Exchange中,然后Exchange再将消息分配到对应的Queue中

Direct Exchange Type

Spring 代码实现如下

Producer 部分

@Component
public class DirectProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "This direct exchange message";
        this.rabbitTemplate.convertAndSend("testqueue" , context);
    }
}

代码中将“testqueue” 做为routing key, exchange会将消息发送到名为“testqueue”的Queue里面

Consumer部分

@Component
@RabbitListener(queues = "testqueue")
public class DirectConsumer {

    @RabbitHandler
    public void consume(String context) {
        System.out.println("Direct Exchange Consumer  : " + context);
    }
}

Consumer对“testqueue” 进行监听然后消费消息。

Topic Exchange

Topic Exchange的模式跟Direct 相似,也需要传routing key,但是转发消息是通过通配符来做的。比如:

Topic Exchange type

首先我们先造3个Consumer分别监听不同的queue,但是这里面监听的queue有绑定不同的通配符规则, 其中Annotation里的key包含了通配符规则

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueA",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "*.topic.A")
)
public class TopicConsumerA {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer A  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueB",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "*.topic.*" )
)
public class TopicConsumerB {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer B  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueC",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "main.topic.*")
)
public class TopicConsumerC {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer C  : " + message);
    }
}

然后我们通过发送不同routing key来控制那些Consumer能收到消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.A", context);

main.topic.A 这条routing key 应该是满足上面三个通配符规则的,所有三个Consumer 都能收到发送的消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "test.topic.A", context);

test.topic.A 只能满足A和B的通配符规则,即只有ConsumerA和ConsumerB能收到消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.KK", context);

main.topic.KK 只有ConsumerB和C能收到

Fanout Exchange

Fanout Exchange模式是,不管routing key 只要这个queue绑定到fanout exchange 这些queues 都会收到消息,有点像广播的形式.

Fanout Exchange type

接下来我看一下Spring代码的实现部分

@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "This fanout exchange message";
        rabbitTemplate.convertAndSend("testFanoutExchange","", context);
    }
}

Producer将消息发送到"testFanoutExchange"的exchange中,第二个参数因为是Fanout模式所以我们不需要传routing key。

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "fanoutqueueA",durable = "true"),
                exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerA {

    @RabbitHandler
    public void consume(String message) {
        System.out.println("Fanout Exchange Consumer A  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "fanoutqueueB",durable = "true"),
                exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerB {

    @RabbitHandler
    public void consume(String message) {
        System.out.println("Fanout Exchange Consumer B  : " + message);
    }
}

这边consumer中只要将监听的queue绑定到接收消息的"testFanoutExchange" 即可.

这样在fanout这种模式下只要是绑定到指定的exchange上所有的queues都能收到消息

演示代码

以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-rabbitmq 目录下载并运行

上一篇下一篇

猜你喜欢

热点阅读