RocketMQ入门实战

2023-02-02  本文已影响0人  宏势

消息中间件三大特性:异步、削峰、解耦

一、 安装部署

1.安装

官网下载,解压后直接运行(单Master)

#启动namesrv
nohup sh bin/mqnamesrv &
#查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

#先启动broker
nohup sh bin/mqbroker -n localhost:9876 &
查看日志
tail -f ~/logs/rocketmqlogs/broker.log 

注意Broker的IP设置,手动修改属性 echo "brokerIP1=主机外网IP" > broker.conf
指定配置文件启动:nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c conf/broker.conf
参考:https://github.com/apache/rocketmq#readme

2.部署模型

部署模型图

NameServer

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现, NameServer是无状态化,节点之间无任何信息同步。

主要包括两个功能:

Broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证. Broker采用Master-Slave架构解决高可用问题,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master可以多个,提升消息写入能力

1.每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer
2.Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态
3.Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的MasterSlave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

部署方式

一般生产环境采用的是多Master-Slave异步复制模式

二、消息模型

消息模型图

消费者组

为了消息消费能力的水平扩展,引入消费者组。

消费模式

同一个消费者组的消费模式:

1.集群模式下扩缩消费者数量也无法提升或降低消费能力,但当Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力
3.广播模式下扩缩消费者数量也无法提升或降低消费能力

三、基本概念

队列

为了支持高并发和水平扩展,需要对 Topic 进行分区,称为队列。一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上

主题Topic&Tag

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。可使用 Tag 可以实现对 Topic 中的消息进行过滤。通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息

Keys

消息的唯一标识码,方便定位消息丢失问题。

消费位点

每个队列都会记录自己的最小位点、最大位点,针对消费者组,会有消费位点,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的

四、代码示例

消息发送分为三种方式:同步异步单向传输,前两种是可靠的,无论是否成功都有相应,最后一种只管发送没有返回结果

public class SyncProducer {
  public static void main(String[] args) throws Exception {
    // 初始化一个producer并设置Producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");  //(2)
    // 启动producer
    producer.start();
    for (int i = 0; i < 100; i++) {
      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );   //(3)
      // 利用producer进行发送,并同步等待发送结果
      SendResult sendResult = producer.send(msg);   //(4)
      System.out.printf("%s%n", sendResult);
    }
    // 一旦producer不再使用,关闭producer
    producer.shutdown();
  }
}

Push消费

public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // 初始化consumer,并设置consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
   
    // 设置NameServer地址 
    consumer.setNamesrvAddr("localhost:9876");
    //设置消费模式,默认是集群模式
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    consumer.subscribe("TopicTest", "*");
    //注册回调接口来处理从Broker中收到的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(new String(msgs.get(0).getBody(),StandardCharsets.UTF_8))
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    // 启动Consumer
    consumer.start();
    System.out.printf("Consumer Started.%n");
  }
}

Pull 消费

public class LitePullConsumerSubscribe {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setPullBatchSize(20);
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

控制台

rocketmq-dashboard

官网:https://github.com/apache/rocketmq-dashboard

官网:https://rocketmq.apache.org/
Github: https://github.com/apache/rocketmq

上一篇 下一篇

猜你喜欢

热点阅读