RocketMQ

2023-08-23  本文已影响0人  追风还是少年

RocketMQ组成

image.png

RocketMQ由NameServer、Broker、Producer、Consumer组成:

Producer Group作用

主要的用途是事务消息,Broker 需要向消息发送者回查事务状态

顺序消息

消息的顺序性分为两部分,生产顺序性和消费顺序性

Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。
如需保证消息生产的顺序性,则必须满足以下条件:

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。
由于顺序消息都存储在同一个队列里,而同一个队列只会由消费组中的一个消费者消费
如需保证消息消费的顺序性,则必须满足以下条件:

顺序消息的缺陷:

使用建议:

public class SelectMessageQueueByHash implements MessageQueueSelector {
    public SelectMessageQueueByHash() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object shardingKey) {
        int value = shardingKey.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }
}
SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ 
, new SelectMessageQueueByHash ()
, shardingKey);

事务消息

事务消息支持在分布式场景下保障消息生产和本地事务的最终一致性
事务消息交互流程如下图所示:

image.png

使用限制:

事务消息涉及的主题:

回查流程:


image.png

事务消息在RocketMQ中处理流程:


image.png image.png
@Component
@Slf4j
public class OrderTransactionalListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            log.info("本地事务已提交。{}",message.getTransactionId());
        }catch (Exception e){
            log.error("执行本地事务失败。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("结束本地事务状态查询:{}",state);
        return state;
    }
}
@Component
@Slf4j
public class TransactionalMsgProducer implements InitializingBean, DisposableBean {
    private String GROUP = "order_transactional";
    private TransactionMQProducer msgProducer;
    //用于执行本地事务和事务状态回查的监听器
    @Autowired
    private OrderTransactionalListener orderTransactionListener;
    //执行任务的线程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
    private void start(){
        try {
            this.msgProducer.start();
        } catch (MQClientException e) {
            log.error("msg producer starter occur error;",e);
        }
    }
    private void shutdown() {
        if(null != msgProducer) {
            try {
                msgProducer.shutdown();
            } catch (Exception e) {
                log.error("producer shutdown occur error;",e);
            }
        }
    }
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.msgProducer.sendMessageInTransaction(message, null);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        msgProducer = new TransactionMQProducer(GROUP);
        msgProducer.setNamesrvAddr("namesrvHost:ip");
        msgProducer.setSendMsgTimeout(Integer.MAX_VALUE);
        msgProducer.setExecutorService(executor);
        msgProducer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    @Override
    public void destroy() throws Exception {
        this.shutdown();
    }
}
@Service
@Slf4j
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private  TransactionLogMapper transactionLogMapper;
    @Autowired
    private TransactionalMsgProducer producer;
    //执行本地事务时调用,将订单数据和事务日志写入本地数据库
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){
        //1.创建订单
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);
        //2.写入事务日志
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);
        log.info("create order success,order={}",orderDTO);
    }
    //前端调用,只用于向RocketMQ发送事务消息
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}

定时/延时消息

当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息。

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。默认支持18个等级的延迟消息,延时等级定义为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,设置延时等级的时候是设置延迟时间对应的序号(从1开始)

延时消息写入Commitlog文件的时,是先写入SCHEDULE_TOPIC_XXX主题的对应延时等级队列中的,SCHEDULE_TOPIC_XXX一共有18个消息队列,分别对应每个延时等级,每个消息队列的queueId=延时等级-1


image.png

延时消息处理过程:

Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 设置延时消息的级别
msg.setDelayTimeLevel(2);

重试消息

消息收发过程中,若Consumer消费某条消息失败或消费超时,则会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。您可以通过消费死信队列中的死信消息来恢复业务异常。
重试消息Topic为%RETRY%{ConsumeGroup}

消息重试主要功能行为包括:

配置采用覆盖的方式生效,即最后启动的Consumer实例会覆盖之前启动的实例的配置。因此,请确保同一Group ID下的所有Consumer实例设置的最大重试次数和重试间隔相同,否则各实例间的配置将会互相覆盖

image.png

一条消息无论重试多少次,这些重试消息的Message ID都不会改变。
消息重试只针对集群消费模式生效。
广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

集群消费模式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

consumer.setMaxReconsumeTimes(20);

死信队列

死信队列用于处理无法被正常消费的消息,即死信消息
当一条消息初次消费失败,RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中
在RocketMQ中,这种正常情况下无法被消费的消息称为死信消息,存储死信消息的特殊队列称为死信队列,死信消息的Topic为%DLQ%{ConsumerGroup}。

死信消息具有以下特性:

死信队列具有以下特性:

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,您可以在RocketMQ控制台重新发送该消息,让消费者重新消费一次。

负载均衡

producer发送消息的负载均衡

对于非顺序消息(普通消息、定时/延时消息、事务消息)场景,默认且只能使用RoundRobin模式的负载均衡策略。
(1)轮询模式(默认),RoundRobin
RoundRobin模式下,生产者发送消息时,以消息为粒度,按照轮询方式将消息依次发送到指定主题中的所有可写目标队列中,保证消息尽可能均衡地分布到所有队列。


image.png

consumer订阅消息的负载均衡

image.png

负载均衡策略算法:
(1)平均负载(默认策略),AllocateMessageQueueAveragely
分配方式类似分页,对topic下的所有MessageQueue进行排序,对同一个消费组的所有ConsumerId进行排序
MessageQueue作为需要分页的记录,Consumer作为页码,计算每页多少个MessageQueue,每页有哪些MessageQueue
(2)环形平均负载,AllocateMessageQueueAveragelyByCircle
每个Consumer分配到MessageQueue的个数与平均负载相同,只是每个Consumer不是分配到连续的MessageQueue
(3)用户自定义配置,AllocateMessageQueueByConfig
(4)机房负载策略,AllocateMessageQueueByMachineRoom
(5)机房负载策略改进版本,AllocateMachineRoomNearBy
(6)一致性哈希策略,AllocateMessageQueueConsistentHash


image.png

一致性哈希有个哈希环的概念,哈希环由0到2^31-1,哈希上的点都是虚拟的,将所有的Consumer使用Consumer的Id进行哈希计算,得到是哈希环上的点,然后把点存储到TreeMap里,将所有的MessageQueue一次进行相同的哈希计算,按顺时针方向找到距离计算出的哈希值最近的Consumer点,MessageQueue最终就归属这个Consumer

负载均衡策略由每个消费者自己执行计算的
负载均衡触发时机:

消息存储

消息存储的整体架构 消息存储目录结构 每条消息格式

RocketMQ存储的文件主要有Commitlog文件、ConsumeQueue文件、IndexFile文件,ConsumeQueue文件和IndexFile文件是基于Commitlog文件异步生成的:

image.png

消息查询

消费进度

    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();

消费者上报到broker的消费进度,在broker中也是先保存在在本地内存
ConsumerOffsetManager:

    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

broker中保存的消费进度通过在broker BrokerController 启动定时任务每隔5秒持
久化到文件/store/config/consumerOffset.json,该文件存储的json格式为


image.png
{
    "Topic@ConsumeGroup": {
        QueueId: Offset
    }
}
public class BrokerController {
    public boolean initialize() throws CloneNotSupportedException {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    }
}

消息刷盘

消息发送到broker后,broker不是直接把消息写到commitlog文件,而是写到操作系统的PageCache,之后再从PageCache刷盘到commitlog文件
消息刷盘方式有三种:

高性能

MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, position, fileSize)

使用Mmap的限制:
a.Mmap映射的内存空间释放的问题;由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;
b.MappedByteBuffer内存映射大小限制;因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;
c.使用MappedByteBuffe的其他问题;会存在内存占用率较高和文件关闭不确定性的问题;


image.png image.png image.png

PageCache机制也不是完全无缺点的,当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟。
对于这些情况,RocketMQ采用了多种优化技术,比如内存预分配,文件预热,mlock系统调用等,来保证在最大可能地发挥PageCache机制优点的同时,尽可能地减少其缺点带来的消息读写延迟。

image.png

高可用

上一篇 下一篇

猜你喜欢

热点阅读