RabbitMQ程序猿大本营架构

RabbitMQ在分布式系统中的应用

2016-05-05  本文已影响4463人  商领云
cover

由于之前做的项目中需要在多个节点之间可靠地通信,所以废弃了之前使用的Redis pub/sub(因为集群有单点问题,且有诸多限制),改用了RabbitMQ。
使用期间得到不少收获,也踩了不少坑,所以在此分享下心得。(简单了解下RabbitMQ? 点这里

怎么保证可靠性的?

RabbitMQ提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。

    durable=true   
    channel.queueDeclare("task_queue", durable, false, false, null); // 队列  
    channel.basicPublish("", "task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes()); // 消息

注:当声明的队列已经存在时,尝试重新定义它的durable是不生效的。

    autoAck = false;
    requeue = true;
    channel.basicConsume(queue, autoAck, callback);
    channel.basicAck();//应答
    channel.basicReject(deliveryTag, requeue); // 拒绝
    channel.basicRecover(requeue); // 恢复
    channel.confirmSelect(); // 进入confirm模式
    // do publish messages... 每条消息都会被编号,从1开始
    channel.getNextPublishSeqNo() // 查看下一条要发送的消息的序号
    channel.waitForConfirms(); // 等待所有消息发送并确认 
    channel.txSelect();
    try {
        // do something...
        channel.txCommit();
    } catch (e){
        channel.txRollback();
    }

注:由于exclusive类型的队列会在client和server连接断开时被删掉,所以对它设置持久化属性和备份都是没有意义的。

一些需要注意的地方

配置:集群配置

结合Docker使用

集群版本的实现:详见我自己写的一个例子rabbitmq-server-cluster

消息队列中间件的比较

最后附一张网上截取的测试结果:

performance

更多性能参数见:http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

如果有兴趣简单了解下RabbitMQ的简单介绍,可以继续往下看~

<a name="other" /> 简介

几个重要的概念

Client

RabbitMQ官方实现了很多热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:

可以加上断开重试机制:

    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(10000);

创建连接和通道:

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
1

生产者:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消费者:

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  }
};
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
workqueue

代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。
如果设置了autoAck=false,那么可以实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。需要在消费者中加上:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

其他同上。

broadcast

生产者:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

消费者同上。

routing

生产者:

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

消费者同上。

topics
*可以表示一个单词
#可以表示一个或多个单词

生产者:

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

消费者同上。

rpc

其实就是一对一模式的一种用法:
首先,客户端发送一条消息到服务端声明的队列,消息属性中包含reply_to和correlation_id

- reply_to 是客户端创建的消息的队列,用来接收远程调用结果
- correlation_id 是消息的标识,服务端回应的消息属性中会带上以便知道是哪条消息的结果。  

然后,服务端接收到消息,处理,并返回一条结果到reply_to队列中,

最终,客户端接收到返回消息,继续向下处理。

Server

支持各大主流操作系统,这里以Unix为例介绍下常用配置和命令:

安装

由于RabbitMQ是依赖于Erlang的,所以得首先安装最近版本的Erlang。

单点的安装比较简单,下载解压即可。下载地址

注:若启动失败了,可以在启动日志中查看到具体的错误信息。

<a name="cluster" /> 集群

集群节点共享所有的状态和数据,如:用户、路由、绑定等信息(队列有点特殊,虽然从所有节点都可达,但是只存在于第一次声明它的那个节点上,解决方案:消息队列的高可用);每个节点都可以接收连接,处理数据。

集群节点有两种,disc:默认,信息存在本地数据库;ram:加入集群时,添加--ram参数,信息存在内存,可提高性能。

更多详细的配置见:配置

注:如果加入集群失败,可先查看

注:docker版集群的见:rabbitmq-server-cluster

高级

AMQP协议简介

RabbitMQ原生支持AMQP 0-9-1并扩展实现了了一些常用的功能:AMQP 0-9-1

包含三层:

注:其他协议的支持见:RabbitMQ支持的协议

常用插件

管理界面(神器)

启动后,执行rabbitmq-plugins enable rabbitmq_management->
访问http://localhost:15672->查看节点状态,队列信息等等,甚至可以动态配置消息队列的主备策略,如下图:

management

<a name="federation"/> Federation

启用Federation插件,使得不同集群的节点之间可以传递消息,从而模拟出类似集群的效果。这样可以有几点好处:

几个概念:

注:

federated_cluster
federated_broadcast

rabbitmq-plugins enable rabbitmq_federation
如果启用了管理界面,可以添加:
rabbitmq-plugins enable rabbitmq_federation_management
这样就可以在界面配置Upstream和Policy了。

注:如果在一个集群中使用federation,需要该集群每个节点都启用Federation插件

注:更多插件请见:插件

原文作者来自 MaxLeap 团队_Service&Infra 成员:吕舜

上一篇 下一篇

猜你喜欢

热点阅读