(javascript)100行代码实现可重试可并发的异步任务队
2020-06-12 本文已影响0人
Lawrence_4aee
背景
昨天我们团队项目中有需要控制并发数,并且将大文件切片上传的需求,期望是并发的去上传文件切片,于是乎构思了一下,利用闭包实现了一个并发并且可重试的非顺序执行的任务队列
构思
首先需要有一个队列栈task_queue,和一个任务池task_pool
当任务完成时,将完成的任务从task_queue中移除,同时从任务池task_pool中拿到新的任务入栈
当任务失败的时候,也要将任务从task_queue中移除,同时加入task_pool的栈首,等待前一任务执行后进行重试
构造函数
constructor(options) {
this.retry_counts_map = {} // 记录重试次数
this.retry_times = options.retry_times || 3 // 默认重试3次
this.concurrent_size = options.concurrent_size
this.task_pool = (options.tasks || []).map((task, index) => {
task.id = index
return task
})
this.handler = options.handler
this.retry_fun = options.retry_fun
this.task_quene = []
this.failed_tasks = []
this.success_count = 0
}
利用闭包来记录任务,执行失败后进行重试,并在remove task之后再次执行start
start () {
if (this.task_quene.length === 0 && this.task_pool.length === 0) {
this.handler({
succees: this.success_count,
failed: this.failed_tasks.length,
failed_tasks: this.failed_tasks
})
return
}
for (let i = this.task_quene.length; i < this.concurrent_size; i++) {
let task = this.task_pool.shift()
if (!task) {
return
}
this.push(task)
}
}
push (task) {
let mission = ((t) => {
return t().then(res => {
console.log('完成任务:' + task.id)
this.success_count = this.success_count + 1
this.remove(t)
}).catch(e => {
this.retry(task)
})
})(task)
this.task_quene.push(mission)
}
remove (task) {
this.task_quene.splice(this.task_quene.findIndex(t => {
return t.id === task.id
}), 1)
this.start()
}
记录失败的任务,加入任务池的首位
retry (task) {
if (!this.retry_counts_map[task.id]) {
this.retry_counts_map[task.id] = 1
} else {
this.retry_counts_map[task.id] = this.retry_counts_map[task.id] + 1
}
if (this.retry_counts_map[task.id] > this.retry_times) {
console.log(`任务${task.id} 重试次数已达到三次`)
this.failed_tasks.push(task)
} else {
if (this.retry_fun) {
this.retry_fun().then(() => {
console.log(`异步重试任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
this.task_pool.unshift(task)
}).catch(e => {
this.failed_tasks.push(task)
})
} else {
console.log(`任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
this.task_pool.unshift(task)
}
}
this.remove(task)
}
}
完整代码&DEMO
class TaskQueue {
constructor(options) {
this.retry_counts_map = {} // 记录重试次数
this.retry_times = options.retry_times || 3 // 默认重试3次
this.concurrent_size = options.concurrent_size
this.task_pool = (options.tasks || []).map((task, index) => {
task.id = index
return task
})
this.handler = options.handler
this.retry_fun = options.retry_fun
this.task_quene = []
this.failed_tasks = []
this.success_count = 0
}
start () {
if (this.task_quene.length === 0 && this.task_pool.length === 0) {
this.handler({
succees: this.success_count,
failed: this.failed_tasks.length,
failed_tasks: this.failed_tasks
})
return
}
for (let i = this.task_quene.length; i < this.concurrent_size; i++) {
let task = this.task_pool.shift()
if (!task) {
return
}
this.push(task)
}
}
push (task) {
let mission = ((t) => {
return t().then(res => {
console.log('完成任务:' + task.id)
this.success_count = this.success_count + 1
this.remove(t)
}).catch(e => {
this.retry(task)
})
})(task)
this.task_quene.push(mission)
}
remove (task) {
this.task_quene.splice(this.task_quene.findIndex(t => {
return t.id === task.id
}), 1)
this.start()
}
retry (task) {
if (!this.retry_counts_map[task.id]) {
this.retry_counts_map[task.id] = 1
} else {
this.retry_counts_map[task.id] = this.retry_counts_map[task.id] + 1
}
if (this.retry_counts_map[task.id] > this.retry_times) {
console.log(`任务${task.id} 重试次数已达到三次`)
this.failed_tasks.push(task)
} else {
if (this.retry_fun) {
this.retry_fun().then(() => {
console.log(`异步重试任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
this.task_pool.unshift(task)
}).catch(e => {
this.failed_tasks.push(task)
})
} else {
console.log(`任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
this.task_pool.unshift(task)
}
}
this.remove(task)
}
}
const randomNum = function (minNum, maxNum) {
switch (arguments.length) {
case 1:
return parseInt(Math.random() * minNum + 1, 10);
case 2:
return parseInt(Math.random() * (maxNum - minNum + 1) + minNum, 10);
default:
return 0;
}
}
const __main__ = function () {
let tasks = []
for (let i = 0; i < 20; i++) {
tasks.push((() => {
return new Promise((resolve, reject) => {
setTimeout(function () {
let num = randomNum(0, 100)
if (num > 50) {
resolve()
} else {
reject()
}
}, randomNum(300, 800))
})
}))
}
const task_queue = new TaskQueue({
retry_times: 3, // 默认重试3次
concurrent_size: 5,
tasks: tasks,
handler (res) {
console.log(res)
}
})
task_queue.start()
}
__main__()