Linux系统 Centos7 环境基于Docker部署Rock

2020-07-07  本文已影响0人  朝槿木兮

消息队列

基本概述

MQ,Message Queue,基于TCP协议构建的简单协议,区别于具体的通信协议

核心概念

  1. 消息主题:Message Topic,一级消息类型,生产者向其发送消息
  2. 消息生产者:Message Producer,消息发布者,或者消息服务终端,负责生产消息和发送消息到消息主题
  3. 消息消费者:Message Consumer,消息订阅者,或者消息客户终端,负责从消息主题接收并处理消费消息
  4. 消息实体:Message Object,消息对象,生产者向消息主题发送并最终传送给消息者的数据和属性的符号以及组合
  5. 消息属性:Message Attributes,消息内容,生产者对消息进行抽象和定义的相关属性,包含Message Key 和Message Target
  6. 消息组:Message Group,消息分类组别,通称一类生产者和消费者,通常生产和消费同一类消息,且消息发布和订阅的逻辑基本一致

编程思想

数据结构:消息队列的数据结构采用FIFO方式来定义与实现
设计模式:采用观察者模式

观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新

消息流派:

基本概述

Uio5Ps.png

NameServer名称服务器[MQ命名空间服务器],大致相当于 jndi技术,更新和发现 broker服务。用于保存Broker相关元信息,并给生产者和消费者查找Broker消息。每个Broker在启动都会在名称服务器[NameServer]注册,生产者在发送消息前会根据消息主题到名称服务器查询获取Broker路由消息,消费者也会定时获取主题的路由消息。

ps:属于无状态服务设计,可横向扩展,节点之间无通信,可以部署多台机器来标识伪集群。

Broker:消息存储中心[消息中转角色],负责存储和转发消息。接收来自生产者的消息并进行存储,消费者从这拉取消息。存储与消息相关的元数据,主要包括用户组,消息进度偏移量,队列消息等。其中Broker分为Master和Slave节点:

  1. Master节点:可读可写
  2. Slave节点:只可读不可写
    其部署方式:

ps:一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master和Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义。BrokerId为0表示Master,BrokerId非0表示Slave。然后所有的Broker和Name Server上的节点建立长连接,定时注册Topic信息到所有Name Server。

Producer消息生产者->负责生产消息,生产者向消息服务器发送业务应用程序生成的消息。主要有同步发送和异步发送方式两种,其中:

ps:Producer与Name Server其中一个节点建立连接。定期从Name Server取Topic信息。并与提供该Topic信息的Master建立长连接。Producer也可以集群部署。

Consumer消息消费者 负责消费消息,从消息服务器拉取消息并将其输入用户应用程序中。主要分为拉取型消费者和推送型消费者:
拉取型消费者:Pull Consumer->主动从消息服务器拉取消息,只要批量拉取消息,用户就会启动消费过程
推送型消费者:Push Consumer->封装消息的拉取,消费进度和其它内部维护工作,消息到达之后便执行回调接口留给用户应用程序来实现。属于被动消费类型,Push拉取时需要注册消息费者监听器,当监听器被触发之后开始消费消息。

ps:Consumer 与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

部署Rocketmq

拉取rocketmq镜像:docker pull foxiswho/rocketmq
1.查询镜像images:docker search rocketmq

[root@centos-meteor ~]# docker search rocketmq
NAME                               DESCRIPTION                                     STARS               OFFICIAL            AUTOMATED
styletang/rocketmq-console-ng      rocketmq-console-ng                             20                                      
rocketmqinc/rocketmq               Image repository for Apache RocketMQ            17                                      
foxiswho/rocketmq                  rocketmq                                        14                                      
laoyumi/rocketmq                                                                   10                                      [OK]
xlxwhy/rocketmq                    alibaba's rocketmq                              4                                       
huanwei/rocketmq-broker                                                            2                                       
2019liurui/rocketmq-broker         RocketMQ broker image for RocketMQ-Operator     1                                       
2019liurui/rocketmq-namesrv        RocketMQ name service image for RocketMQ-Ope…   1                                       
apacherocketmq/rocketmq            Docker Image for Apache RocketMQ                1                                       
rocketmqinc/rocketmq-operator      The Kubernetes operator for RocketMQ            0                                       
2019liurui/rocketmq-operator       Kubernetes Operator for RocketMQ !              0                                       
apacherocketmq/rocketmq-operator   RocketMQ Operator is to manage RocketMQ serv…   0                                       
coder4/rocketmq                    rocketmq                                        0                                       [OK]
rocketmqinc/rocketmq-namesrv       Customized RocketMQ Name Server Image for Ro…   0                                       
rocketmqinc/rocketmq-broker        Customized RocketMQ Broker Image for RocketM…   0                                       
slpcat/rocketmq-console-ng                                                         0                                       
huanwei/rocketmq                                                                   0                                       
huanwei/rocketmq-broker-k8s                                                        0                                       
king019/rocketmq                   rocketmq                                        0                                       
pengzu/rocketmq-console-ng         web console for rocketmq ,this code is from …   0                                       
fengzt/rocketmq-broker             apache rocketmq 4.2.0 broker server(官方文档…       0                                       
huanwei/rocketmq-operator                                                          0                                       
slpcat/rocketmq                                                                    0                                       
fengzt/rocketmq-nameserver         apache rocketmq 4.2.0 nameserver                0                                       
icyblazek/rocketmq                 RocketMQ                                        0                                       
[root@centos-meteor ~]# 

2.执行:docker pull foxiswho/rocketmq

3.创建docker存储根目录并且授权:
mkdir docker && chmod -R 777 docker/

[root@centos-meteor /]# cd docker/
[root@centos-meteor docker]# pwd
/docker
[root@centos-meteor docker]# 

4.部署名称服务器rocketmq-namesrv-server[9876]:

rocketmq-namesrv-server

docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv

docker run -itd --restart=always --privileged=true -p 9876:9876 --name  rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv

ps:创建/docker/rocketmq/namesrv/logs和/docker/rocketmq/namesrv/store目录

5.部署消息服务器rocketmq-broker-server[10911]:

rocketmq-broker-server

docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name  rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

ps:
1.创建/docker/rocketmq/broker/logs和/docker/rocketmq/broker/store目录
2.编写broker.conf配置文件:

brokerClusterName = rocketmq-cluster  
brokerName = broker-server 
brokerId = 0  
deleteWhen = 04  
fileReservedTime = 48  
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRote=ASYNC_MASTER
brokerRole = ASYNC_MASTER  
flushDiskType = ASYNC_FLUSH  
# 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP
brokerIP1=Server-IP
#限制的消息大小
maxMessageSize=65536
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
#并发send线程数,多线程来发送消息可能会出现broker busy
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums=8
highSpeedMode=false
commercialBaseCount=1
maxErrorRateOfBloomFilter=20
accessMessageInMemoryMaxRatio=40
#无读写客户端存活时间
clientChannelMaxIdleTimeSeconds=120
flushDelayOffsetInterval=10000
serverSocketRcvBufSize=131072
#单次 Pull 消息(内存)传输的 最大字节数
maxTransferBytesOnMessageInMemory=262144
clientManageThreadPoolNums=32
serverChannelMaxIdleTimeSeconds=120
serverCallbackExecutorThreads=0
enablePropertyFilter=false
transientStorePoolSize=5
enableConsumeQueueExt=false
#rocketmq server config
serverPooledByteBufAllocatorEnable=true
serverSocketRcvBufSize=131072
#rocketmq client config

6.部署控制后台rocketmq-consloe-server[8082]:

rocketmq-consloe-server

docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=47.104.22.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest

docker run -itd -p 8082:8080  --restart=always --privileged=true  --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=Server-IP:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest

ps:
1.创建/docker/rocketmq/console/data目录
2.Server-IP为broker.conf配置brokerIP1值

7.最终部署结果:


UFZUL6.png

整合Rocketmq开发实战

1.配置Rocketmq的Maven依赖:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

ps:
1.目前rocketmq最新版本是4.7.1:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

2.这里选择的是rocketmq版本[4.4.0],因为部署是4.4.0版本,推荐部署版本相同的rocketmq

2.创建pivotal-cloud-queue工程:


UFe878.png

ps:创建消息队列统一模块工程,在业务应用程序服务模块依赖该工程,并封装消息生产者[Producer]和消息消费者[Consumer]处理类。

3.封装rocketmq属性配置类:

添加spring-boot-configuration-processor依赖:

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-configuration-processor</artifactId>
   <optional>true</optional>
 </dependency>

编写RocketmqProperties属性配置类:


UFMlo6.png
package com.pivotal.cloud.queue.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @className: com.pivotal.cloud.queue.properties.RocketmqProperties
 * @title: RocketmqProperties
 * @description: 封装Pivotal项目RocketmqProperties类
 * @content: PivotalCloud项目系统RocketmqProperties自定义属性配置类
 * @author: marklin
 * @datetime: 2020-07-07 01:32
 * @version: 1.0.0
 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved.
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "pivotal.cloud.rocketmq")
public class RocketmqProperties {
    /**
     * rocketmq消息队列生产者-Producer
     */
    private final Producer producer = new Producer();

    /**
     * rocketmq消息队列消费者-Consumer
     */
    private final Consumer consumer = new Consumer();

    @Data
    public static class Producer {
    }

    @Data
    public static class Consumer {
    }
}

编写RocketmqConfiguration配置类:

UFM2Os.png
package com.pivotal.cloud.queue.configuration;

import com.pivotal.cloud.queue.properties.RocketmqProperties;
import lombok.AllArgsConstructor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @className: com.pivotal.cloud.queue.configuration.RocketmqConfiguration
 * @title: RocketmqConfiguration
 * @description: 封装Pivotal项目RocketmqConfiguration类
 * @content: //TODO
 * @author: marklin
 * @datetime: 2020-07-07 02:25
 * @version: 1.0.0
 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved.
 */
@AllArgsConstructor
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({RocketmqProperties.class})
public class RocketmqConfiguration {
}

编写RocketmqTemplate生产者模板类:

package com.pivotal.cloud.queue.template;

/**
 * @className: com.pivotal.cloud.queue.template.RocketmqTemplate
 * @title: RocketmqTemplate
 * @description: 封装Pivotal项目RocketmqTemplate类
 * @content: PivotalCloud项目系统RocketmqTemplate生产者模板类
 * @author: marklin
 * @datetime: 2020-07-07 02:48
 * @version: 1.0.0
 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved.
 */
public class RocketmqTemplate {
}

编写RocketmqListener消费者监听器类:


UFMsfS.png
package com.pivotal.cloud.queue.listener;

/**
 * @className: com.pivotal.cloud.queue.listener.RocketmqListener
 * @title: RocketmqListener
 * @description: 封装Pivotal项目RocketmqListener类
 * @content: PivotalCloud项目系统RocketmqListener消费者监听器类
 * @author: marklin
 * @datetime: 2020-07-07 02:52
 * @version: 1.0.0
 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved.
 */
public interface RocketmqListener {
}

编写META-INF/spring.factories工厂类:

UFM5kV.png
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.pivotal.cloud.queue.configuration.RocketmqConfiguration,\
  com.pivotal.cloud.queue.configuration.RabbitmqConfiguration,\
  com.pivotal.cloud.queue.configuration.ActivemqConfiguration,\
  com.pivotal.cloud.queue.configuration.KafkaConfiguration,

版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。

上一篇下一篇

猜你喜欢

热点阅读