系统设计:分布式调度器(KISS面试系列)
这是系统面试准备系列的第一篇博客。我的目标是设计出KISS(keep it simple stupid)系统,可以在实际系统设计面试中花费45-60分钟进行讨论。
介绍
任务调度是一个常见的系统设计面试问题,下面的一些领域,可能会需要设计一个任务调度系统:
- 设计一个对账处理的系统(每月/周/日 进行对账处理)
- 设计一个代码部署的系统(定期进行代码流水线处理)
这篇文章的目的是设计一个简单且可扩容的任务调度系统
问题陈述
设计一个在指定时间间隔运行的任务调度系统
功能性需求
- 用户能够调度或者查看任务
- 用户能够列出所有已提交的任务,并显式当前任务的状态
- 任务能够运行一次或者重复运行。任务需要在定义的调度时间之后,再给定的X阈值时间内运行(假设X = 15分钟)
- 单个任务的执行时间不能超过X分钟(假设X = 5分钟)
- 任务也会有优先级,高优先级的任务需要比低优先级的任务先执行
- 任务的最终输出需要被存储至文件系统中
非功能性需求
- 高可用 - 对于用户而言,系统可以一直添加/查看任务
- 可扩展性 - 系统可以扩展支持数百万的任务调度
- 可靠性 - 系统必须最少一次执行一个任务,并且相同的任务不能在同一时间
被不同的进程调度 - 持久性 - 在任何失败场景下,系统都不应该丢失任务的信息
- 延迟 - 系统应在作业被接受后立即回馈用户。 用户不必等到工作完成。
流量和存储估算
假设任务调度系统的QPS设计目标:1000QPS;假设单个调度任务最多可以运行 5 分钟,因此该系统是高度受到CPU的制约的
CPU限制(CPU Bound)
一个现代的CPU可以有16核,单核拥有2个线程,单个调度任务最多可以运行 5 分钟。 那么单机器CPU执行任务的公式:
16核 * 2线程 / 5min / 60s = 0.1 任务/秒 (8000 任务/天)
内存限制(Memory Bound)
假设每一个任务会占用5M的内存,对应的内存分配了16G, 那么单机器内存执行任务的公式:
16G * 1024 / 5M / 5min / 60s = 10 任务/秒
如果需要达到1000QPS, 那么需要的机器数量 1000 / 10 = 100
上述结果给了我们一个提示,即用于处理任务调度的单机设计既不高可用也不可扩展。 所以我们需要分布式系统来设计解决方案
系统接口
有如下三个接口需要暴露给用户
- 任务提交: submitJob(api_key, user_id, job_schedule_time, job_type, priority, result_location)
job_type值可以是ONCE 或者 RECURRING,API可以返回HTTP Code 202代表接收到了任务 - 单个任务查看: viewJob(api_key, user_id, job_id)
响应的任务状态包括NOT_STARTED STARTED COMPLETED - 任务列表查看: listJobs(api_key, user_id, pagination_token)
高层级设计
用户的请求流程:
(1 & 2) 用户通过load balancer(或API 网关)提交/获取任务
(3) 请求会被持久化到DB中,并且返回ack告知用户已处理
(4 & 5)Job Scheduler Service会持续的从DB中拉取快到期执行的任务,并将其塞入队列中
(6 & 7)Job Executor Service会执行实际的业务逻辑调度任务,更新最终结果进文件系统并将任务调度状态置为COMPLETED
DB 设计
由于我们对事务支持或任何其他 ACID 属性没有严格要求,只需牢记峰值的QPS(2 * 1000 = 2000),所以我们可以同时使用SQL或者NoSQL数据库,考虑到 NoSql 在规模、维护和成本方面的明显优势,我会选择使用 DynamoDb 的 NoSql 解决方案
-
用户查询模式
给定UsedId,新增任务
给定UserId,获取所有jobIds -
DB Schema
Table: JOB
+------------------------------+--------+
| Attribute | Type |
+------------------------------+--------+
| user_id (partition key) | uuid |
| job_id (sort key) | uuid |
| actual_job_execution_time | date |
| job_status | string |
| job_type | string |
| job_interval | int |
| result_location | string |
| current_retries | int |
| max_retries | int |
| scheduled_job_execution_time | date |
| execution_status | string |
job_status:用户查看的任务状态,包含NOT_STARTED, STARTED, COMPLETED三种状态
execution_status:当前服务维护的实际执行状态,包含NOT_STARTED, CLAIMED, PROCESSING, SUCCESS, RETRIABLE_FAILURE, FATAL_FAILURE
除了用户之外,我们的作业调度服务将轮询数据库以获取到期的任务,可以通过不同的方式来实现这一目标:
- 基于X分钟大小的桶窗口分区
我们可以创建名为 scheduleJob 的索引来检索最后 X 分钟到期的作业,将其拉出之后使用延时队列特性塞入MQ中
Index: ScheduledJob
+----------------------------------------------+------+
| Attribute | Type |
+----------------------------------------------+------+
| scheduled_job_execution_time (partition key) | time |
| job_id (sort key) | uuid |
+----------------------------------------------+------+
Query (SQL equivalent):
SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X
- 基于X分钟大小的桶窗口+ share id 的分区
很有可能,在一个特定的时间窗口内,很多任务会被接收到(假设有10万个)。在这种场景下,上述查询语句的性能会非常的慢,我们可以根据随机的Share Id(假设在1到N之间)进一步对DB进行分区
Index: ScheduledJob
+----------------------------------------------+------+
| Attribute | Type |
+----------------------------------------------+------+
| scheduled_job_execution_time (partition key) | uuid |
| shard_id (partition key) | int |
| job_id (sort key) | uuid |
+----------------------------------------------+------+
Query (SQL equivalent):
SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X and shard_id == Y
深入底层设计
- 任务调度器(Job Scheduler)如何工作
任务调度流程:
- 每 X 分钟,Master节点创建一个权威的 UNIX 时间戳,并为每个 worker 分配一个 shard_id 和 schedule_job_execution_time。
- Worker 节点将执行以下查询,并将任务推送到 Kafka 队列中执行。
worker 1:
SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X and shard_id = 1
worker 2:
SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X and shard_id = 2
容错设计
- Master 监控Worker的健康状况并知晓哪个Worker死亡以及如何将查询重新分配给新Worker。
- 如果Master节点死亡,我们可以分配其他worker节点作为master
- 此外,如果worker成功查询db,我们还可以引入本地数据库来跟踪状态并将待执行任务放入队列中
- 任务执行器(Job Executor)如何工作
Job Executor存在多个Consumer从队列中拉取待消费的任务,Consumer 机器也存在Master进程与Worker进程。Master进程与Worker进程都基于Pull模型上运行。Master进程 将从队列中轮询调度任务,Worker进程将通过执行以下代码不断从 master 轮询调度任务
while True:
w = get_next_work()
do_work(w)
任务执行流程与容错设计
- 当从队列中取出调度任务时,消费者的 master 更新db中JOB的属性 execution_status=CLAIMED。
- 当Worker进程接手工作时,它会更新 execution_status=PROCESSING 并不断向本地 DB 发送健康检查。
- 调度任务完成后,Worker进程会将结果推送到 AWS s3 中,更新db中JOB的 execution_status=COMPLETED 或 FATAL_FAILED,并使用状态更新本地 db
- Worker进程和Master进程都将更新本地数据库中的健康检查。
健康检查服务
健康检查服务定期运行(比如每 x 秒),并扫描上次从Worker进程接收到的健康检查小于定义阈值的数据库。 在这种情况下,它认为调度任务未能处理并将其推送回队列。
结论
系统设计是一个广泛的话题,一小时的面试很难涵盖到系统设计的方方面面。在上述的设计中,我们已经达到了面试官可以进一步深究的大部分区域。
引用