Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

2023-01-11  本文已影响0人  rollingstarky
Distributing tasks to a set of consumers

将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumersfanout distributionventilator
与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。

A messaging pipeline

ZeroMQ Fanout/Fanin 模式

分布式 hashsum 破解器

需要以下组件实现一个标准的并行管线:

The architecture of a typical pipeline with ZeroMQ

即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。

实现 producer

为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 ab 两个字母的话,长度为 3 的字符串组合共有图示的以下几种:

Indexed n-ary tree for alphabet (a, b)

indexed-string-variation 包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
generateTasks.js:

export function* generateTasks(searchHash, alphabet,
    maxWordLength, batchSize) {
    let nVariations = 0
    for (let n = 1; n <= maxWordLength; n++) {
        nVariations += Math.pow(alphabet.length, n)
    }

    console.log('Finding the hashsum source string over ' +
        `${nVariations} possible variations`)

    let batchStart = 1
    while (batchStart <= nVariations) {
        const batchEnd = Math.min(
            batchStart + batchSize - 1, nVariations)
        yield {
            searchHash,
            alphabet: alphabet,
            batchStart,
            batchEnd
        }

        batchStart = batchEnd + 1
    }
}

producer.js:

import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
    const ventilator = new zmq.Push()
    await ventilator.bind('tcp://*:5016')
    await delay(1000)

    const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await ventilator.send(JSON.stringify(task))
    }
}

main().catch(err => console.log(err))
实现 worker

process Task.js:

import isv from 'indexed-string-variation'
import { createHash } from 'crypto'

export function processTask(task) {
    const variationGen = isv.generator(task.alphabet)
    console.log('processing from ' +
        `${variationGen(task.batchStart)} (${task.batchStart})` +
        `to ${variationGen(task.batchEnd)} (${task.batchEnd}`)

    for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
        const word = variationGen(idx)
        const shasum = createHash('sha1')
        shasum.update(word)
        const digest = shasum.digest('hex')

        if (digest === task.searchHash) {
            return word
        }
    }
}

processTask() 遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task 对象中的 searchHash 比较。

worker.js:

import zmq from 'zeromq'
import { processTask } from './processTask.js'

async function main() {
    const fromVentilator = new zmq.Pull()
    const toSink = new zmq.Push()

    fromVentilator.connect('tcp://localhost:5016')
    toSink.connect('tcp://localhost:5017')

    for await (const rawMessage of fromVentilator) {
        const found = processTask(JSON.parse(rawMessage.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await toSink.send(`Found: $found`)
        }
    }
}

main().catch(err => console.error(err))

worker.js 创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。

实现 results collector

collector.js:

import zmq from 'zeromq'

async function main() {
    const sink = new zmq.Pull()
    await sink.bind('tcp://*:5017')

    for await (const rawMessage of sink) {
        console.log('Message from worker: ', rawMessage.toString())
    }
}

main().catch(err => console.error(err))

运行以下命令测试结果:

node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

AMQP 实现 pipeline 和 competing consumers

Task distribution architecture using a message queue broker

像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。

hashsum 破解器的 AMQP 实现

producer-amqp.js:

import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createConfirmChannel()
    await channel.assertQueue('tasks_queue')

    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
    }

    await channel.waitForConfirms()
    channel.close()
    connection.close()
}

main().catch(err => console.error(err))

worker-amqp.js:

import amqp from 'amqplib'
import { processTask } from './processTask.js'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('tasks_queue')
    channel.consume(queue, async (rawMessage) => {
        const found = processTask(
            JSON.parse(rawMessage.content.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await channel.sendToQueue('results_queue',
                Buffer.from(`Found: ${found}`))
        }
        await channel.ack(rawMessage)
    })
}

main().catch(err => console.error(err))

collector-amqp.js:

import amqp from 'amqplib'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('results_queue')
    channel.consume(queue, msg => {
        console.log(`Message from worker: ${msg.content.toString()}`)
    })
}

main().catch(err => console.error(err))

运行如下命令测试效果:

node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

通过 Redis Streams 实现任务分发

Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。

A Redis Stream consumer group

Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。

producer-redis.js:

import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()

const [, , maxLength, searchHash] = process.argv

async function main() {
    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await redisClient.xadd('tasks_stream', '*',
            'task', JSON.stringify(task))
    }

    redisClient.disconnect()
}

main().catch(err => console.error(err))

worker-redis.js:

import Redis from 'ioredis'
import { processTask } from './processTask.js'

const redisClient = new Redis()
const [, , consumerName] = process.argv

async function main() {
    await redisClient.xgroup('CREATE', 'tasks_stream',
        'workers_group', '$', 'MKSTREAM')
        .catch(() => console.log('Consumer group already exists'))

    const [[, records]] = await redisClient.xreadgroup(
        'GROUP', 'workers_group', consumerName, 'STREAMS',
        'tasks_stream', '0')
    for (const [recordId, [, rawTask]] of records) {
        await processAndAck(recordId, rawTask)
    }

    while (true) {
        const [[, records]] = await redisClient.xreadgroup(
            'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
            'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
        for (const [recordId, [, rawTask]] of records) {
            await processAndAck(recordId, rawTask)
        }
    }
}

async function processAndAck(recordId, rawTask) {
    const found = processTask(JSON.parse(rawTask))
    if (found) {
        console.log(`Found! => ${found}`)
        await redisClient.xadd('results_stream', '*', 'result',
            `Found: ${found}`)
    }

    await redisClient.xack('tasks_stream', 'workers_group', recordId)
}

main().catch(err => console.error(err))

collector-redis.js:

import Redis from 'ioredis'

const redisClient = new Redis()

async function main() {
    let lastRecordId = '$'
    while (true) {
        const data = await redisClient.xread(
            'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
        for (const [, logs] of data) {
            for (const [recordId, [, message]] of logs) {
                console.log(`Message from worker: ${message}`)
                lastRecordId = recordId
            }
        }
    }
}

main().catch(err => console.error(err))

运行程序测试效果:

node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

参考资料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

上一篇下一篇

猜你喜欢

热点阅读