(原创)一种基于RabbitMQ的分布式任务系统的设计实现
虽然过去一年中的工作以爬虫为主,但个人认为最大的收获在于参考已有的系统、从零开始把整个爬虫任务调度系统实现了一遍,因而有了一些架构上的理解,它具有非常强的可扩展性,或许这就是RPC(Remote procedure call)
的优点吧,因此在这里做一些归纳总结。
受限于本人薄弱的阐述能力,如果对RabbitMQ有所了解再来阅读此文是比较合适的,如果能有所斧正就更好了。(参考RabbitMq python tutorial--语言可选)
1.架构概要
这个系统是基于RabbitMQ
来实现的,因而它是跨平台、跨语言的、分布式的、可平行扩展的,较适合处理时效性不高的任务,在系统内部,任务处理当然是异步的,但对外的表现形式,则是可同步可异步的。
也因此,它的架构概要图与RabbitMQ
的生产者消费者模型在本质上是一样的,只不过生产者和消费者的结构更丰富具体。
producer_consumer.png
从概要设计图可以看出,整个系统对外只暴露了
Frontend
,其余部分对Requester
都是不可见的,他也不需要知道这些,他只要能够通过Frontend
发送请求(任务)、得到响应、查询结果就ok了。而对于RabbitMQ来说,图中的
mq_Client
和mq_Server
则相当于生产者和消费者了。
系统分为5个部分:
- RabbitMQ负责传递消息(Message),一个消息对应一个任务,通过不同的Queue分发不同种类的任务,支持集群
- DB负责存储任务详情或者结果,不论是mysql、MongoDB都支持集群
- Redis负责缓存临时信息,也是支持集群(也可以使用memcached)
- Frontend负责对外提供RESTFul API,响应任务请求创建任务,并通过worker_manager_proxy将任务放进MQ。可以将整个任务序列化放进MQ,也可以只把任务标识(如task_id)放进MQ,同时把信息放进Redis,或者创建后存入DB,只要记得处理任务时存在哪里就在哪里取。不过为了节省MQ的空间,不建议将整个任务信息放入MQ。
- worker_manager负责从MQ里取任务,并为每个任务创建一个worker线程进行工作,期间会通过Redis、DB进行必要的读写或者增删改查操作。
部署时就可以相应的分开部署。
2.特性分析
这里以上图为例来解释一下
1) 跨平台、跨语言、分布式的
-
rabbitMQ
可以部署在linux
、windows
等操作系统上,因此跨平台 -
rabbitMQ
教程代码就给出了多达十来种语言的版本,当然是跨语言的,对应此系统,就是你可以使用Python
和Java
版本的Frontend
来接收请求,而同时使用PHP
、JavaScript
写的Worker来完成工作,只要它们的内在逻辑是一致的。 - 我现在对于消息队列(MQ)最大的理解就是:通过网络实现了原来一个系统内部多个进程间的通信,如此一来各个进程(比如
Frontend
和Worker
)的运行就突破了空间限制,这当然是分布式的。
2)可扩展性
扩展的目的是为了应对高并发,为了同时处理更多的任务。
假设系统出现性能瓶颈,rabbitMQ、redis、db都可以通过集群来扩展,而Frontend、worker_manager则直接加机器起服务就行。
众所周知,分布式系统通过加机器并发4台1核1G的低配机器,性能可视同于1台4核4G的机器。
3.流程简介
这里以最简单的只处理一种任务(可能不只一个Frontend
一个worker_manager
)的处理过程来介绍一下系统的工作流程:
- 先定义任务的字段
# python or node
task = {
'task_id': uuid(), # 唯一标识
'status': 'new' or `done` ..., # 任务状态
'type': 'crawler', # 任务类型,假设为爬虫类型
'callback_url': '', # 回调地址,根据需要提供
'data': {}, # 更详细的任务信息
'correlation_id':uuid() # 用于保持request和response的一致性
}
-
启动
RabbitMQ
、Redis
、DB
服务,以及Frontend
和work_manager
进程,后两者在启动时通过mq_client
或者mq_server
declare了两个queue:crawler_task_queue
、notify_queue
,并立刻开始监听。 -
Requester
通过Frontend
提供的API发送任务请求,请求附带必要的参数 -
Frontend
根据请求创建task
并存入DB
(也可以是Redis
),然后将task_tid
、correlation_id
作为MSG
的内容发送至crawler_task_queue
中,并将correlation_id
记录在outstandingRequests
列表中,此时:- 一般异步处理时,此处
Frontend
向Requester
返回response
即可 - 想要同步化处理,则等到
task
完成后,再返回response
,不过这样Requester
就出于阻塞状态,一般不会这么做。
- 一般异步处理时,此处
-
worker_manager
监听到crawler_task_queue
有新的MSG
,取出来,根据task_id
从DB
中读取完整的task
信息,由于type=='crawler'
,创建相应的Crawler_Worker
进行工作。 -
Crawler_Worker
完成工作后,将结果数据存入DB
,更新task.status='done'
。worker_manager
将MSG
发送至notify_queue
-
Frontend
监听notify_queue
,读取到MSG
信息后,检查correlation_id
能否在outstandingRequests
列表中匹配到:- 如果未匹配,则不予理会,
- 如果匹配,则进行下一步处理,
ack(MSG)
,并从outstandingRequests
列表中删除该correlation_id
。
-
如果
task['callback_url']
的值不为空,则Frontend
向该callback_url
发送任务结果(简单的task
或者根据type='crawler'
查询到的详细的crawlerResultData
)。 -
对于没有提供
callback_url
的Requester
而言,则需要在一定时间后,调用Frontend
的查询接口进行查询。
如上,一个任务的流程就算走完了。
如果理解了上述流程,就能明白通过type
字段以及对应的work_manger
可以扩展到多种任务的分布式异步处理。
4.进阶
1)prefetch的使用
一个worker_manager
只处理一个task
当然不划算,但如果来者不拒为每一个task
都创建Crawler_Worker
,在进行批跑时,很容把机器挂掉。这时就需要用到RabbitMQ
的prefetch
和ack机制
了,怎么实现就不说了。需要提醒的是,worker_manager
在通过ack机制
限制当前机器上任务并发数在prefetch
之下时,需要先缓存MSG
待task
完成后再进行ack
操作。
2)关于correlation_id
的用途
加入任务并发性要求较高,启动了frontA
和frontB
两个Frontend
和多台Crawler_Worker
来处理任务。那么correlation_id
就可以保证frontA
受理的任务请求所需要的异步callback
操作以及同步化response
依然可以通过frontA
完成。
3)异步任务的交互
系统处理任务是异步的,上述任务流程中没有涉及到交互问题,但众所周知,爬虫工作过程中有一部分是需要人的交互的,这也是一种简单的反爬措施,比如模拟登录中需要提交短信验证码,那怎么办呢?
- 提示一下,从
status
入手,此外,Frontend
的一致性在上一点说过了,当然还要考虑到爬虫工作的连续性,就以补充短信验证码来说,必然要延用之前的cookie
和session
,这里就不细说了,再给个截图作为提示。
不要被queue的名字所迷惑
5.代码示例
这里给出简单的rabbitMQ_rpc_node_sample,只有rabbitmq
的封装、rabbitmq_client
和rabbitmq_server
的实现以及调用示例invoking_instance
。
示例效果是进行简单计算,对GET
请求中的参数a
进行求根操作,对POST
请求中的参数a
进行求3次方。
提示一下,示例代码中有一个小小的不太影响使用的坑,不算bug,有心者可以发现,我也是最近才发现的。
6.致谢
感谢魏总在这一年间给予的指导,让我收获颇丰。