Java 程序员Java

如何基于分布式KV研发一款消息中间件

2021-08-26  本文已影响0人  程序花生

背景

目前笔者所在团队正常研发一款流程编排引擎,其中有多个功能特性需要MQ的延迟消息/消费者重试等特性。经过多方面的考量,我们最终决定采用计算存储分离的架构,在分布式KV存储的基础上,研发一款定制化的MQ。目前,其具备了MQ的主要特性。本文所描述的是基于分布式KV的基础上研发MQ的核心思路。

术语

整体架构

说明:

KV 存储

本文以KV Storage是Redis Cluster为例进行讲解,Broker Cluster中的所有Broker实例共享这个Redis Cluster,其他任意支持按key排序scan的KV存储均可。

从持久化的角度来说,使用内存模式Redis并不合适,这里意图在于说明基于分布式KV实现的MQ的核心原理。事实上,在公司内部,我们使用的是基于RocksDB基础上研发的分布式KV,在网络通信上兼容redis协议。

为了简化,本文中不讨论Redis Cluster扩/缩容,Slot迁移的情况。但足以掌握基于分布式KV研发一款消息中间件的核心原理。

网络通信

Producer和Consumer与Broker的通信,在笔者的项目中,使用的是Grpc。在开源的通信框架中,Grpc可以说是最流行的方案,Apacha RocketMQ 5.x版本也采用了Grpc。在本文中,并不会对Grpc进行介绍。

详细设计

Broker集群元数据

每个Broker启动时,可以将自身信息注册到Redis中,以便producer/consumer进行服务发现。例如通过hash结构维护:

Key

[cluster]$cluster_name
复制代码

Value

filed         value

broker1       ip:port

broker2       ip:port
复制代码

Topic元数据

Topic元数据主要是维护Topic下有多少Partition,这些Partition在Redis Cluster中是如何分布的。用户在创建Topic时,指定分区数量。

Redis Cluster有16384个槽,每个Redis分片负责其中部分槽。当创建一个Topic时,例如指定10个分区,可以按照一定策略把这个10个分区映射到不同的槽上,相当于间接的把分区分配到了不同的redis分片上。

当创建好一个Topic之后,将Topic下的分区分配给不同的Broker。例如10个分区,10个Broker,则每个Broker负责一个分区。如果只有5个分区,那么需要分配给其中5个broker。

例如通过hash结构维护维护这个映射关系

key

[topic_metadata]$topic_name

value

filed         value

partition1    broker1

partition2    broker2

消息

消息使用protobuf进行定义:

message Message{

  google.protobuf.Struct metadata = 1;  //消息的元数据

  string partition = 2;  //消息所属的分区

  int64 offset = 3;      //消息的offset

  string msgId = 4;      //消息的唯一id

  string topic = 5;      //消息的topic

  string key = 6;        //消息key,用于路由

  bytes body = 7;        //消息体

  google.protobuf.Timestamp born_time = 8;  //消息生成时间 

  google.protobuf.Timestamp expireTime = 9; //消息截止时间,用于延迟消息

}

生产者在发送消息时,最简单的情况下,只需要指定消息的topic、body。当有其他特殊需求时,可以指定以下字段:

消息发送

从Producer的角度来说:

从broker角度来说:

接收到一条消息时,offset信息维护。每次发送,在确定消息需要发送到的分区后,broker需要将对应partition的offset+1。在笔者的项目中,使用了hash结构存储每个分区的最大的offset:

key:

[topic_offset]{$topic}

value

field             value

partition 1       offset1

partition 2       offset2

为了提升offset维护的效率,不需要每次都调用HINCRBY,而是在broker启动时,将自己维护的分区offset信息加载到内存中,之后发送消息时,内存中增加,定期保存到KV中。

此外,需要有一个修正offset的逻辑,避免broker异常宕机的情况下,offset没有成功保存到redis中。在broker启动时,可以从当前维护的最大offset开始往后扫描,如果发现了新消息,则说明offset需要修正(参考如下消息存储部分)。

消息存储

当消息被写入到redis中,key满足以下格式:

[topic]{$topic_$partition}$offset

其中:

消息拉取

拉取通过redis scan操作进行,将scan到的消息交由消费者处理。

在拉取消息时,依赖于一个consumer offset,其维护了某个consumer group消费某个topic的进度信息。拉取时,从这个位置开始。这里可以考虑使用hash数据结构:

key:

[consumer_offset]$group_$topic

value

field             value

partition 1       offset1

partition 2       offset2

当有消费者连接上某个broker时,broker查询到自己负责的分区的parititon offset,从这个位置开始拉取消息。

延迟消息

对于所有延迟消息,会首先发送到一个特殊的delay topic中,相当于暂存这个消息。消息到期后,投递到目标topic中。

[delay]$broker_id}$expireTime

消费重试

当消费者消费一条消息失败时,默认也是会走延迟消息逻辑,到期后,投递给目标消费者,重新消费。重试消息,也是基于延迟消息的基础上开发的。

在逻辑上不同的是:

[topic]retry_$consumer_group

这个Topic下应该也有包含分区,策略与之前所述Topic元数据维护相同。

死信队列

当消息重试到最大重试次数后,依然失败,可以放入死信队列。如:

[topic]dead_$consumer_group

消息TTL

为了避免已经被消费的消息,占用大量的存储空间,消息会被清理。我们的策略是:

总结

本文的目的是介绍如何基于分布式KV研发一款MQ的核心思路,很多容灾/高可用/性能优化等方面的主题并没有讨论。仅仅是提供一种核心思路,如果希望在生产环境使用,需要进行大量的改进与优化。

作者:字节大力智能
链接:https://juejin.cn/post/7000561368633966623
来源:掘金

上一篇 下一篇

猜你喜欢

热点阅读