Node.js 设计模式笔记 —— 消息中间件及其应用模式(发布
主要有两类技术可以用来整合分布式应用:一类是通过共享存储作为一个中心化的协调者,跟踪和保存所有需要共享的信息;另一类则是通过消息中间件,向系统中的所有节点散布数据、事件和命令等。
消息存在于软件系统的各个层级。我们通过互联网交换消息完成通信;通过管道发送消息给其他进程;设备驱动通过消息与硬件进行交互等等。任何用于在组件和系统之间交换信息的离散或结构化数据都可以视为消息。
消息系统基础
对于消息系统,有以下四个基本要素需要考虑:
- 通讯的方向。可以是单向的,也可以是“请求 - 响应”模式
- 通讯的目的。同时决定了消息本身的内容
- 消息的时效性。可以同步或者异步地发送与接收
- 消息的投递方式。可以直接投递也可以通过某个中间件
单向 vs “请求 - 应答”模式
单向模式:消息从源头推送到目的地。常见的应用比如邮件系统、将工作任务分派给一系列工作节点的系统。
单向通信
“请求 - 响应”模式:一方发出的消息总能够与对方发出的消息匹配。比如 web 服务的调用、向数据库请求数据等。
Request/Reply包含多个响应节点的“请求 - 响应”模式:
Multi-node request/reply消息类型
消息内容主要取决于通信的目的。通常有以下三种:
- 命令消息
- 事件消息
- 文档消息
命令消息用来令接收者触发某个动作或者任务。借助它可以实现远程过程调用(RPC)系统,分布式计算等。RESTful HTTP 请求就是简单的命令消息的例子。
事件消息用来通知另一个组件发生了某些情况。事件在分布式系统中是一种很重要的整合机制,用来确保系统的各个组件保持同样的步调。
文档消息基本上就是在组件之间传输数据。比如数据库请求的结果。
异步队列和流
同步通信类似于打电话。电话的双方必须同时在线,连接到同一个通道,实时地交流信息。当我们需要打给另一个人时,通常就得搞一部新的手机或者挂掉当前正在进行的通话,拨打新的号码。
异步通信类似于发短信。我们发送短信的时刻,并不需要接收方已经接入了网络。我们可以一条接一条地发送多条短信给不同的人,以任意顺序接收对方的回复(如果有的话)。
另一个异步通信的重要特性就是,消息可以被临时存储在某个地方,再在之后的某个时间送达。当接收方非常忙碌无法处理新的消息,或者我们需要确保投递的成功率时,这个特性就非常有用了。
消息队列就是这样一种在生产者和消费者之间存储消息的中间组件。若消费者因为某种原因崩溃、断开连接等,消息会在队列中累积,待消费者重新上线时立即进行分发。
另外一种类似的数据结构是 log。log 是一种只能追加的结构,它是持久的,其消息可以在到达时被读取,也可以通过访问其历史记录来获取。在消息系统中,也常被叫做 stream。
不同于队列,在 stream 中,消息被消费后不会被移除,意味着 stream 在消息的获取方面有着更高的自由度。队列通常一次只暴露一条消息给消费者,而一个 stream 能够被多个消费者共享(甚至是同一份消息)。
消息队列:
message queue
流:
stream
点对点 vs 消息中间件
peer-to-peer vs broker“发布 - 订阅” 模式
就是一种分布式的观察者模式。
Pub/Sub一个最小化的实时聊天应用
package.json:
{
"type": "module",
"dependencies": {
"amqplib": "^0.10.3",
"ioredis": "^5.2.4",
"JSONStream": "^1.3.5",
"level": "^8.0.0",
"leveldown": "^6.1.1",
"levelup": "^5.1.1",
"monotonic-timestamp": "^0.0.9",
"serve-handler": "^6.1.5",
"superagent": "^8.0.6",
"ws": "^8.11.0",
"yargs": "^17.6.2",
"zeromq": "^6.0.0-beta.16"
}
}
index.js:
import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(`${msg}`)
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState == ws.OPEN) {
client.send(msg)
}
}
}
server.listen(process.argv[2] || 8000)
- 首先创建一个 HTTP 服务,将所有请求转发给一个特别的 handler(
staticHandler
),该 handler 负责 serve 所有的静态文件 - 创建一个 WebSocket 服务实例,绑定到 HTTP 服务。同时监听来自 WebSocket 客户端的连接请求,以及客户端发送的消息
- 当某个客户端发送的新消息到达时,通过
broadcast()
函数将消息广播给所有的客户端
www/index.html:
<!DOCTYPE html>
<html>
<body>
Messages:
<div id="messages"></div>
<form id="msgForm">
<input type="text" placeholder="Send a message" id="msgBox"/>
<input type="submit" value="Send"/>
</form>
<script>
const ws = new WebSocket(
`ws://${window.document.location.host}`
)
ws.onmessage = function (message) {
const msgDiv = document.createElement('div')
msgDiv.innerHTML = message.data
document.getElementById('messages').appendChild(msgDiv)
}
const form = document.getElementById('msgForm')
form.addEventListener('submit', (event) => {
event.preventDefault()
const message = document.getElementById('msgBox').value
ws.send(message)
document.getElementById('msgBox').value = ''
})
</script>
</body>
</html>
通过 node index.js 8002
命令运行应用,打开两个浏览器页面访问 Web 服务,测试聊天效果:
但我们的应用是无法进行横向扩展的。比如再启动一个新的服务实例 node index.js 8003
,此时连接到 8002 的客户端无法与连接到 8003 的客户端通信。可以自行测试。
使用 Redis 作为消息中间件
架构图如下所示。每个服务实例都会把从客户端收到的消息发布到消息中间件,同时也会通过中间件订阅从其他服务实例发布的消息。
message broker- 通过客户端网页发送的消息传递给对应的 chat server
- chat server 把收到的消息发布到 Redis
- Redis 将收到的消息分发给所有的订阅方(chat server)
- chat server 将收到的消息再分发给所有连接的客户端
index-redis.js:
import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import Redis from 'ioredis'
const redisSub = new Redis()
const redisPub = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisPub.publish('chat_message', `${msg}`)
})
})
redisSub.subscribe('chat_message')
redisSub.on('message', (channel, msg) => {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
})
server.listen(process.argv[2] || 8000)
运行 node index-redis.js 8002
、node index-redis.js 8003
两条命令启动两个服务实例,此时连接到不同服务器的客户端相互之间也能够进行通信。
点对点 Pub/Sub 模式
通过 ZeroMQ 创建两种类型的 socket:PUB
和 SUB
。PUB socket 绑定到本地机器的某个端口,负责监听来自其他机器上 SUB socket 的订阅请求。当一条消息通过 PUB socket 发送时,该消息会被广播到所有连接的 SUB socket。
index-zeromq.js:
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import yargs from 'yargs'
import zmq from 'zeromq'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
let pubSocket
async function initializeSockets() {
pubSocket = new zmq.Publisher()
await pubSocket.bind(`tcp://127.0.0.1:${yargs(process.argv).argv.pub}`)
const subSocket = new zmq.Subscriber()
const subPorts = [].concat(yargs(process.argv).argv.sub)
for (const port of subPorts) {
console.log(`Subscribing to ${port}`)
subSocket.connect(`tcp://127.0.0.1:${port}`)
}
subSocket.subscribe('chat')
for await (const [msg] of subSocket) {
console.log(`Message from another server: ${msg}`)
broadcast(msg.toString().split(' ')[1])
}
}
initializeSockets()
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(`${msg}`)
pubSocket.send(`chat ${msg}`)
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(yargs(process.argv).argv.http || 8000)
- 通过
yargs
模块解析命令行参数 - 通过
initializeSocket()
函数创建 Publisher,并绑定到由--pub
命令行参数提供的端口上 - 创建 Subscriber socket 并将其连接到其他应用实例的 Publisher socket。被连接的 Publisher 端口由
--sub
命令行参数提供。之后创建以chat
为过滤器的订阅,即只接收以chat
开头的消息 - 通过
for
循环监听到达 Subscriber 的消息,去除消息中的chat
前缀,通过broadcast()
函数将处理后的消息广播给所有连接的客户端 - 当有消息到达当前实例的 WebSocket 服务时,广播此消息到所有客户端,同时通过 Publisher 发布该消息
运行服务测试效果:
node index-zeromq.js --http 8002 --pub 5000 --sub 5001 --sub 5002
node index-zeromq.js --http 8003 --pub 5001 --sub 5000 --sub 5002
node index-zeromq.js --http 8004 --pub 5002 --sub 5000 --sub 5001
peer to peer
通过队列实现可靠的消息投递
消息队列是消息系统中的一种重要抽象。借助消息队列,通信中的发送方和接收方不必同时处于活跃的连接状态。队列系统会负责存储未投递的消息,直到目标处于能够接收的状态。
消息系统的投递机制可以简单概况为以下 3 类:
- 最多一次:fire-and-forget。消息不会被持久化,投递状态也不会被确认。意味着在接收者崩溃或者断开连接时,消息有可能丢失
- 最少一次:消息会确保至少被收到一次。但是重复收取同一条消息的情况有可能出现,比如接收者在收到消息后突然崩溃,没有来得及告知发送者消息已经收到。
- 只有一次:这是最可靠的投递机制,保证消息只会被接收一次。但由于需要更复杂的确认机制,会牺牲一部分消息投递的效率。
当消息投递机制可以实现“最少一次”或者“只有一次”时,我们就有了 durable subscriber。
durable subscriberAMQP
AMQP 是一个被很多消息系统支持的开放标准协议。除了定义一个通用的传输协议以外,他还提供了用于描述 routing、filtering、queuing、reliability 和 security 的模型。
AMQP- Queue:用于存储消息的数据结构。假如多个消费者绑定了同一个队列,消息在它们之间是负载均衡的。队列可以是以下任意一种类型:
- Durable:当中间件重启时队列会自动重建。但这并不意味着其内容也会被保留。实际上只有标记为持久化消息的内容才会被保存到磁盘,并在重启时恢复
- Exclusive:队列只绑定给唯一一个特定的订阅者,当连接关闭时,队列即被销毁
- Auto-delete:当最后一个订阅者断开连接时,队列被删除
- Exchange:消息发布的地方。Exchange 会将消息路由至一个或者多个 queue。路由规则取决于具体的实现:
- Direct exchange:通过完整匹配一个 routing key 来对消息进行路由(如
chat.msg
) - Topic exchange:对 routing key 进行模糊匹配(如
chat.#
匹配所有以chat
开头的 key) - Fanout exchange:将消息广播至所有连接的 queue,忽略提供的任何 routing key
- Direct exchange:通过完整匹配一个 routing key 来对消息进行路由(如
- Binding:Exchange 和 queue 之间的链接,定义了用于过滤消息的 routing key 或模式
上述所有组件由中间件进行维护,同时对外暴露用于创建和维护的 API。当连接到某个中间件时,客户端会创建一个 channel 对象负责维护通信的状态。
AMQP 和 RabbitMQ 实现 durable subscriber
chat 应用和消息历史记录服务的架构图:
AMQP and history serviceAMQP 和数据库实现 history service
此模块由两部分组成:一个 HTTP 服务负责将聊天历史记录暴露给客户端;一个 AMQP 消费者负责获取聊天消息并将它们保存在本地数据库中。
historySvc.js:
import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
async function main() {
const db = levelup(leveldown('./msgHistory'))
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const { queue } = channel.assertQueue('chat_history')
await channel.bindQueue(queue, 'chat')
channel.consume(queue, async msg => {
const content = msg.content.toString()
console.log(`Saving message: ${content}`)
await db.put(timestamp(), content)
channel.ack(msg)
})
createServer((req, res) => {
res.writeHead(200)
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res)
}).listen(8090)
}
main().catch(err => console.error(err))
- 创建一个到 AMQP 中间件的连接
- 设置一个名为
chat
的 fanout 模式的 exchange。assertExchange()
函数会确保相应的 exchange 存在,否则就创建 - 创建一个名为
chat_history
的 queue,绑定给上一步中创建的 exchange - 开始监听来自 queue 的消息,将收到的每一条消息保存至 LevelDB 数据库,以时间戳作为键。消息保存成功后由
channel.ack(msg)
进行确认。若确认动作未被中间件收到,则该条消息会保留在队列中再次被处理
index-amqp.js
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'
const httpPort = process.argv[2] || 8000
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const { queue } = await channel.assertQueue(
`chat_srv_${httpPort}`,
{ exclusive: true })
await channel.bindQueue(queue, 'chat')
channel.consume(queue, msg => {
msg = msg.content.toString()
console.log(`From queue: ${msg}`)
broadcast(msg)
}, { noAck: true })
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
channel.publish('chat', '', Buffer.from(msg))
})
superagent
.get('http://localhost:8090')
.on('error', err => console.log(err))
.pipe(JSONStream.parse('*'))
.on('data', msg => {
client.send(Buffer(msg).toString())
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(httpPort)
}
main().catch(err => console.log(err))
- 我们的聊天服务没必要是 durable subscriber,fire-and-forget 机制就足够了,因而有
{ exclusive: true }
选项 - 确认机制也是不需要的。
{ noAck: true }
- 发布消息也很简单,只需要指定目标 exchange(
chat
)和一个 routing key 即可,这里我们使用的是 fanout exchange,不需要路由,routing key 为空 - 发布到 exchange 的消息被转发到所有绑定的 queue,再到达所有订阅了 queue 的服务实例,每个实例再将消息发送到所有连接的客户端
- 通过
superagent
请求 history 微服务,将获取到的所有历史消息发送给刚连接的客户端
运行服务测试效果:
node index-amqp.js 8002
node index-amqp.js 8003
node historySvc.js
通过 streams 实现可靠的消息投递
在系统集成的范畴里,stream(或 log)是一种有序的、只能追加的持久化的数据结构。Stream 概念里的 message 更应该叫做 record,总是被添加到 stream 末尾,且不会在被消费之后自动删除(不同于 queue)。这种特性令 stream 更像是一种数据仓库而不是消息中间件。
Stream 的另一个重要特性在于,record 是被消费者从 stream 中“拉取”的,因而消费者可以按照自己的节奏处理 record。
Stream 可以用来实现可靠的消息投递,一旦消费者崩溃,它可以在恢复后从中断的地方继续拉取消息。
Streams vs 消息队列
Stream 明显的应用场景在于处理顺序的流数据,也支持批量处理或者根据之前的消息确定相关性,并可以跨多个节点分发数据。
Stream 和消息队列都可以实现 Pub/Sub 模式,但消息队列更适合复杂的系统集成任务,它可以提供更复杂的路由机制,允许我们为不同的消息提供不同的优先级,而 Stream 中 record 的顺序是一定的。
通过 Redis Streams 实现 chat 应用
index-stream.js:
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import Redis from 'ioredis'
const redisClient = new Redis()
const redisClientXRead = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', async client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisClient.xadd('chat_stream', '*', 'message', msg)
})
const logs = await redisClient.xrange(
'chat_stream', '-', '+')
for (const [, [, message]] of logs) {
client.send(message)
}
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
let lastRecordId = '$'
async function processStreamMessages() {
while (true) {
const [[, records]] = await redisClientXRead.xread(
'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
for (const [recordId, [, message]] of records) {
console.log(`Message from stream: ${message}`)
broadcast(message)
lastRecordId = recordId
}
}
}
processStreamMessages().catch(err => console.error(err))
server.listen(process.argv[2] || 8080)
-
xadd
负责在收到来自客户端的消息时,向 stream 添加一条新的 record。它接收 3 个参数:- Stream 的名字,这里是
chat_stream
- record 的 ID。这里传入的是星号(
*
),令 Redis 为我们生成一个 ID。ID 必须是单调递增的,以保持 record 的顺序,而 Redis 可以替我们处理这些 - key-value 的列表。这里只提供 value
msg
(从客户端收到的消息)的 'message' key
- Stream 的名字,这里是
- 使用
xrange
检索 stream 的过往记录,以获取聊天历史。我们在每次有客户端连接时就进行一次检索。其中-
表示最小的 ID 值,+
表示最大的 ID 值,因而整个xrange
会获取当前 stream 中所有的消息 - 最后一部分的逻辑是等待新的记录被添加到 stream 中,从而每个应用实例都能读取到更新的消息。这里使用一个无线循环和
xread
命令:- 其中
BLOCK
表示在新消息到达前阻塞 -
0
用来指定超时时间,超过这个时间则直接返回null
。0
代表不超时 -
STREAMS
是一个关键字,告诉 Redis 我们接下来会指定想要读取的 stream 的细节 -
chat_stream
是 stream 的名字 - 最后我们提供 record ID(
lastRecordId
)作为读取新消息的节点。初始情况下是$
,表示当前 stream 中最大的 ID。当我们读取第一条消息后,更新lastRecordId
为最近读取到的消息的 ID
- 其中
此外,解包消息的代码 for (const [, [, message]] of logs) {...}
实际上等同于 for (const [recordId, [propertyId, message]] of logs) {...}
,由 xrange
命令查询到的消息的格式如下:
[
["1588590110918-0", ["message", "This is a message"]],
["1588590130852-0", ["message", "This is another message"]]
]