(javascript)100行代码实现可重试可并发的异步任务队

2020-06-12  本文已影响0人  Lawrence_4aee

背景

昨天我们团队项目中有需要控制并发数,并且将大文件切片上传的需求,期望是并发的去上传文件切片,于是乎构思了一下,利用闭包实现了一个并发并且可重试的非顺序执行的任务队列

构思

首先需要有一个队列栈task_queue,和一个任务池task_pool
当任务完成时,将完成的任务从task_queue中移除,同时从任务池task_pool中拿到新的任务入栈
当任务失败的时候,也要将任务从task_queue中移除,同时加入task_pool的栈首,等待前一任务执行后进行重试

构造函数

constructor(options) {
    this.retry_counts_map = {} // 记录重试次数
    this.retry_times = options.retry_times || 3 // 默认重试3次
    this.concurrent_size = options.concurrent_size
    this.task_pool = (options.tasks || []).map((task, index) => {
      task.id = index
      return task
    })
    this.handler = options.handler
    this.retry_fun = options.retry_fun
    this.task_quene = []
    this.failed_tasks = []
    this.success_count = 0
  }

利用闭包来记录任务,执行失败后进行重试,并在remove task之后再次执行start

start () {
    if (this.task_quene.length === 0 && this.task_pool.length === 0) {
      this.handler({
        succees: this.success_count,
        failed: this.failed_tasks.length,
        failed_tasks: this.failed_tasks
      })
      return
    }
    for (let i = this.task_quene.length; i < this.concurrent_size; i++) {
      let task = this.task_pool.shift()
      if (!task) {
        return
      }
      this.push(task)
    }
  }
  push (task) {
    let mission = ((t) => {
      return t().then(res => {
        console.log('完成任务:' + task.id)
        this.success_count = this.success_count + 1
        this.remove(t)
      }).catch(e => {
        this.retry(task)
      })
    })(task)
    this.task_quene.push(mission)
  }
  remove (task) {
    this.task_quene.splice(this.task_quene.findIndex(t => {
      return t.id === task.id
    }), 1)
    this.start()
  }

记录失败的任务,加入任务池的首位

retry (task) {
    if (!this.retry_counts_map[task.id]) {
      this.retry_counts_map[task.id] = 1
    } else {
      this.retry_counts_map[task.id] = this.retry_counts_map[task.id] + 1
    }
    if (this.retry_counts_map[task.id] > this.retry_times) {
      console.log(`任务${task.id} 重试次数已达到三次`)
      this.failed_tasks.push(task)
    } else {
      if (this.retry_fun) {
        this.retry_fun().then(() => {
          console.log(`异步重试任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
          this.task_pool.unshift(task)
        }).catch(e => {
          this.failed_tasks.push(task)
        })
      } else {
        console.log(`任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
        this.task_pool.unshift(task)
      }
    }
    this.remove(task)
  }
}

完整代码&DEMO

class TaskQueue {
  constructor(options) {
    this.retry_counts_map = {} // 记录重试次数
    this.retry_times = options.retry_times || 3 // 默认重试3次
    this.concurrent_size = options.concurrent_size
    this.task_pool = (options.tasks || []).map((task, index) => {
      task.id = index
      return task
    })
    this.handler = options.handler
    this.retry_fun = options.retry_fun
    this.task_quene = []
    this.failed_tasks = []
    this.success_count = 0
  }

  start () {
    if (this.task_quene.length === 0 && this.task_pool.length === 0) {
      this.handler({
        succees: this.success_count,
        failed: this.failed_tasks.length,
        failed_tasks: this.failed_tasks
      })
      return
    }
    for (let i = this.task_quene.length; i < this.concurrent_size; i++) {
      let task = this.task_pool.shift()
      if (!task) {
        return
      }
      this.push(task)
    }
  }
  push (task) {
    let mission = ((t) => {
      return t().then(res => {
        console.log('完成任务:' + task.id)
        this.success_count = this.success_count + 1
        this.remove(t)
      }).catch(e => {
        this.retry(task)
      })
    })(task)
    this.task_quene.push(mission)
  }
  remove (task) {
    this.task_quene.splice(this.task_quene.findIndex(t => {
      return t.id === task.id
    }), 1)
    this.start()
  }
  retry (task) {
    if (!this.retry_counts_map[task.id]) {
      this.retry_counts_map[task.id] = 1
    } else {
      this.retry_counts_map[task.id] = this.retry_counts_map[task.id] + 1
    }
    if (this.retry_counts_map[task.id] > this.retry_times) {
      console.log(`任务${task.id} 重试次数已达到三次`)
      this.failed_tasks.push(task)
    } else {
      if (this.retry_fun) {
        this.retry_fun().then(() => {
          console.log(`异步重试任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
          this.task_pool.unshift(task)
        }).catch(e => {
          this.failed_tasks.push(task)
        })
      } else {
        console.log(`任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
        this.task_pool.unshift(task)
      }
    }
    this.remove(task)
  }
}


const randomNum = function (minNum, maxNum) {
  switch (arguments.length) {
    case 1:
      return parseInt(Math.random() * minNum + 1, 10);
    case 2:
      return parseInt(Math.random() * (maxNum - minNum + 1) + minNum, 10);
    default:
      return 0;
  }
}

const __main__ = function () {
  let tasks = []
  for (let i = 0; i < 20; i++) {
    tasks.push((() => {
      return new Promise((resolve, reject) => {
        setTimeout(function () {
          let num = randomNum(0, 100)
          if (num > 50) {
            resolve()
          } else {
            reject()
          }
        }, randomNum(300, 800))
      })
    }))
  }
  const task_queue = new TaskQueue({
    retry_times: 3, // 默认重试3次
    concurrent_size: 5,
    tasks: tasks,
    handler (res) {
      console.log(res)
    }
  })
  task_queue.start()
}

__main__()
上一篇下一篇

猜你喜欢

热点阅读