Node.js 设计模式笔记 —— 消息中间件及其应用模式(发布

2023-01-02  本文已影响0人  rollingstarky

主要有两类技术可以用来整合分布式应用:一类是通过共享存储作为一个中心化的协调者,跟踪和保存所有需要共享的信息;另一类则是通过消息中间件,向系统中的所有节点散布数据、事件和命令等。
消息存在于软件系统的各个层级。我们通过互联网交换消息完成通信;通过管道发送消息给其他进程;设备驱动通过消息与硬件进行交互等等。任何用于在组件和系统之间交换信息的离散或结构化数据都可以视为消息。

消息系统基础

对于消息系统,有以下四个基本要素需要考虑:

单向 vs “请求 - 应答”模式

单向模式:消息从源头推送到目的地。常见的应用比如邮件系统、将工作任务分派给一系列工作节点的系统。


单向通信

“请求 - 响应”模式:一方发出的消息总能够与对方发出的消息匹配。比如 web 服务的调用、向数据库请求数据等。

Request/Reply

包含多个响应节点的“请求 - 响应”模式:

Multi-node request/reply

消息类型

消息内容主要取决于通信的目的。通常有以下三种:

命令消息用来令接收者触发某个动作或者任务。借助它可以实现远程过程调用(RPC)系统,分布式计算等。RESTful HTTP 请求就是简单的命令消息的例子。
事件消息用来通知另一个组件发生了某些情况。事件在分布式系统中是一种很重要的整合机制,用来确保系统的各个组件保持同样的步调。
文档消息基本上就是在组件之间传输数据。比如数据库请求的结果。

异步队列和流

同步通信类似于打电话。电话的双方必须同时在线,连接到同一个通道,实时地交流信息。当我们需要打给另一个人时,通常就得搞一部新的手机或者挂掉当前正在进行的通话,拨打新的号码。
异步通信类似于发短信。我们发送短信的时刻,并不需要接收方已经接入了网络。我们可以一条接一条地发送多条短信给不同的人,以任意顺序接收对方的回复(如果有的话)。

另一个异步通信的重要特性就是,消息可以被临时存储在某个地方,再在之后的某个时间送达。当接收方非常忙碌无法处理新的消息,或者我们需要确保投递的成功率时,这个特性就非常有用了。
消息队列就是这样一种在生产者和消费者之间存储消息的中间组件。若消费者因为某种原因崩溃、断开连接等,消息会在队列中累积,待消费者重新上线时立即进行分发。

另外一种类似的数据结构是 log。log 是一种只能追加的结构,它是持久的,其消息可以在到达时被读取,也可以通过访问其历史记录来获取。在消息系统中,也常被叫做 stream
不同于队列,在 stream 中,消息被消费后不会被移除,意味着 stream 在消息的获取方面有着更高的自由度。队列通常一次只暴露一条消息给消费者,而一个 stream 能够被多个消费者共享(甚至是同一份消息)。

消息队列:


message queue

流:


stream

点对点 vs 消息中间件

peer-to-peer vs broker

“发布 - 订阅” 模式

就是一种分布式的观察者模式。

Pub/Sub

一个最小化的实时聊天应用

package.json:

{
    "type": "module",
    "dependencies": {
        "amqplib": "^0.10.3",
        "ioredis": "^5.2.4",
        "JSONStream": "^1.3.5",
        "level": "^8.0.0",
        "leveldown": "^6.1.1",
        "levelup": "^5.1.1",
        "monotonic-timestamp": "^0.0.9",
        "serve-handler": "^6.1.5",
        "superagent": "^8.0.6",
        "ws": "^8.11.0",
        "yargs": "^17.6.2",
        "zeromq": "^6.0.0-beta.16"
    }
}

index.js:

import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

const wss = new WebSocketServer({ server })
wss.on('connection', client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        broadcast(`${msg}`)
    })
})

function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState == ws.OPEN) {
            client.send(msg)
        }
    }
}

server.listen(process.argv[2] || 8000)

www/index.html:

<!DOCTYPE html>
<html>
  <body>
    Messages:
    <div id="messages"></div>
    <form id="msgForm">
      <input type="text" placeholder="Send a message" id="msgBox"/>
      <input type="submit" value="Send"/>

    </form>
    <script>
      const ws = new WebSocket(
          `ws://${window.document.location.host}`
      )
      ws.onmessage = function (message) {
          const msgDiv = document.createElement('div')
          msgDiv.innerHTML = message.data
          document.getElementById('messages').appendChild(msgDiv)
      }
      const form = document.getElementById('msgForm')
      form.addEventListener('submit', (event) => {
          event.preventDefault()
          const message = document.getElementById('msgBox').value
          ws.send(message)
          document.getElementById('msgBox').value = ''
      })
    </script>
  </body>
</html>

通过 node index.js 8002 命令运行应用,打开两个浏览器页面访问 Web 服务,测试聊天效果:

simple chat

但我们的应用是无法进行横向扩展的。比如再启动一个新的服务实例 node index.js 8003,此时连接到 8002 的客户端无法与连接到 8003 的客户端通信。可以自行测试。

使用 Redis 作为消息中间件

架构图如下所示。每个服务实例都会把从客户端收到的消息发布到消息中间件,同时也会通过中间件订阅从其他服务实例发布的消息。

message broker

index-redis.js:

import ws, { WebSocketServer } from 'ws'
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import Redis from 'ioredis'

const redisSub = new Redis()
const redisPub = new Redis()

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

const wss = new WebSocketServer({ server })
wss.on('connection', client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        redisPub.publish('chat_message', `${msg}`)
    })
})

redisSub.subscribe('chat_message')

redisSub.on('message', (channel, msg) => {
    for (const client of wss.clients) {
        if (client.readyState === ws.OPEN) {
            client.send(msg)
        }
    }
})

server.listen(process.argv[2] || 8000)

运行 node index-redis.js 8002node index-redis.js 8003 两条命令启动两个服务实例,此时连接到不同服务器的客户端相互之间也能够进行通信。

message broker

点对点 Pub/Sub 模式

通过 ZeroMQ 创建两种类型的 socket:PUBSUB。PUB socket 绑定到本地机器的某个端口,负责监听来自其他机器上 SUB socket 的订阅请求。当一条消息通过 PUB socket 发送时,该消息会被广播到所有连接的 SUB socket。

peer to peer

index-zeromq.js:

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import yargs from 'yargs'
import zmq from 'zeromq'

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

let pubSocket
async function initializeSockets() {
    pubSocket = new zmq.Publisher()
    await pubSocket.bind(`tcp://127.0.0.1:${yargs(process.argv).argv.pub}`)

    const subSocket = new zmq.Subscriber()
    const subPorts = [].concat(yargs(process.argv).argv.sub)
    for (const port of subPorts) {
        console.log(`Subscribing to ${port}`)
        subSocket.connect(`tcp://127.0.0.1:${port}`)
    }

    subSocket.subscribe('chat')

    for await (const [msg] of subSocket) {
        console.log(`Message from another server: ${msg}`)
        broadcast(msg.toString().split(' ')[1])
    }
}

initializeSockets()

const wss = new WebSocketServer({ server })
wss.on('connection', client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        broadcast(`${msg}`)
        pubSocket.send(`chat ${msg}`)
    })
})

function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState === ws.OPEN) {
            client.send(msg)
        }
    }
}

server.listen(yargs(process.argv).argv.http || 8000)

运行服务测试效果:

node index-zeromq.js --http 8002 --pub 5000 --sub 5001 --sub 5002
node index-zeromq.js --http 8003 --pub 5001 --sub 5000 --sub 5002
node index-zeromq.js --http 8004 --pub 5002 --sub 5000 --sub 5001
peer to peer

通过队列实现可靠的消息投递

消息队列是消息系统中的一种重要抽象。借助消息队列,通信中的发送方和接收方不必同时处于活跃的连接状态。队列系统会负责存储未投递的消息,直到目标处于能够接收的状态。

消息系统的投递机制可以简单概况为以下 3 类:

当消息投递机制可以实现“最少一次”或者“只有一次”时,我们就有了 durable subscriber

durable subscriber
AMQP

AMQP 是一个被很多消息系统支持的开放标准协议。除了定义一个通用的传输协议以外,他还提供了用于描述 routing、filtering、queuing、reliability 和 security 的模型。

AMQP

上述所有组件由中间件进行维护,同时对外暴露用于创建和维护的 API。当连接到某个中间件时,客户端会创建一个 channel 对象负责维护通信的状态。

AMQP 和 RabbitMQ 实现 durable subscriber

chat 应用和消息历史记录服务的架构图:

AMQP and history service
AMQP 和数据库实现 history service

此模块由两部分组成:一个 HTTP 服务负责将聊天历史记录暴露给客户端;一个 AMQP 消费者负责获取聊天消息并将它们保存在本地数据库中。

historySvc.js:

import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'


async function main() {
    const db = levelup(leveldown('./msgHistory'))

    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    await channel.assertExchange('chat', 'fanout')
    const { queue } = channel.assertQueue('chat_history')
    await channel.bindQueue(queue, 'chat')
    channel.consume(queue, async msg => {
        const content = msg.content.toString()
        console.log(`Saving message: ${content}`)
        await db.put(timestamp(), content)
        channel.ack(msg)
    })

    createServer((req, res) => {
        res.writeHead(200)
        db.createValueStream()
            .pipe(JSONStream.stringify())
            .pipe(res)
    }).listen(8090)
}

main().catch(err => console.error(err))

index-amqp.js

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'

const httpPort = process.argv[2] || 8000

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    await channel.assertExchange('chat', 'fanout')
    const { queue } = await channel.assertQueue(
        `chat_srv_${httpPort}`,
        { exclusive: true })
    await channel.bindQueue(queue, 'chat')
    channel.consume(queue, msg => {
        msg = msg.content.toString()
        console.log(`From queue: ${msg}`)
        broadcast(msg)
    }, { noAck: true })

    const server = createServer((req, res) => {
        return staticHandler(req, res, { public: 'www' })
    })

    const wss = new WebSocketServer({ server })
    wss.on('connection', client => {
        console.log('Client connected')

        client.on('message', msg => {
            console.log(`Message: ${msg}`)
            channel.publish('chat', '', Buffer.from(msg))
        })

        superagent
            .get('http://localhost:8090')
            .on('error', err => console.log(err))
            .pipe(JSONStream.parse('*'))
            .on('data', msg => {
                client.send(Buffer(msg).toString())
            })
    })

    function broadcast(msg) {
        for (const client of wss.clients) {
            if (client.readyState === ws.OPEN) {
                client.send(msg)
            }
        }
    }
    server.listen(httpPort)
}

main().catch(err => console.log(err))

运行服务测试效果:

node index-amqp.js 8002
node index-amqp.js 8003
node historySvc.js

通过 streams 实现可靠的消息投递

在系统集成的范畴里,stream(或 log)是一种有序的、只能追加的持久化的数据结构。Stream 概念里的 message 更应该叫做 record,总是被添加到 stream 末尾,且不会在被消费之后自动删除(不同于 queue)。这种特性令 stream 更像是一种数据仓库而不是消息中间件。
Stream 的另一个重要特性在于,record 是被消费者从 stream 中“拉取”的,因而消费者可以按照自己的节奏处理 record。
Stream 可以用来实现可靠的消息投递,一旦消费者崩溃,它可以在恢复后从中断的地方继续拉取消息。

Reliable message delivery with streams
Streams vs 消息队列

Stream 明显的应用场景在于处理顺序的流数据,也支持批量处理或者根据之前的消息确定相关性,并可以跨多个节点分发数据。
Stream 和消息队列都可以实现 Pub/Sub 模式,但消息队列更适合复杂的系统集成任务,它可以提供更复杂的路由机制,允许我们为不同的消息提供不同的优先级,而 Stream 中 record 的顺序是一定的。

通过 Redis Streams 实现 chat 应用

index-stream.js:

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws, { WebSocketServer } from 'ws'
import Redis from 'ioredis'


const redisClient = new Redis()
const redisClientXRead = new Redis()

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
})

const wss = new WebSocketServer({ server })
wss.on('connection', async client => {
    console.log('Client connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        redisClient.xadd('chat_stream', '*', 'message', msg)
    })

    const logs = await redisClient.xrange(
        'chat_stream', '-', '+')
    for (const [, [, message]] of logs) {
        client.send(message)
    }
})

function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState === ws.OPEN) {
            client.send(msg)
        }
    }
}


let lastRecordId = '$'

async function processStreamMessages() {
    while (true) {
        const [[, records]] = await redisClientXRead.xread(
            'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
        for (const [recordId, [, message]] of records) {
            console.log(`Message from stream: ${message}`)
            broadcast(message)
            lastRecordId = recordId
        }
    }
}

processStreamMessages().catch(err => console.error(err))

server.listen(process.argv[2] || 8080)

此外,解包消息的代码 for (const [, [, message]] of logs) {...} 实际上等同于 for (const [recordId, [propertyId, message]] of logs) {...},由 xrange 命令查询到的消息的格式如下:

[
  ["1588590110918-0", ["message", "This is a message"]],
  ["1588590130852-0", ["message", "This is another message"]]
]

参考资料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

上一篇下一篇

猜你喜欢

热点阅读