【Flask】简单的后台异步队列

2019-06-24  本文已影响0人  小熊猫笔记

异步任务队列

_task_queue = Queue.Queue()

异步队列输入

def async_call(function, callback, task_id, task_name, *args, **kwargs):
    _task_queue.put({
        'function': function,
        'callback': callback,
        'task_id': task_id,
        'task_name': task_name,
        'args': args,
        'kwargs': kwargs
    })

异步队列获取并执行

def _task_queue_consumer(**kwargs):
    """
    异步任务队列消费者
    """
    db = kwargs.get("db_queue")
    while True:
        try:
            #循环间隔获取,避免no-block或者一直获取无法停止
            task = _task_queue.get(timeout=10)
            func = task.get('function')

            task_id = task.get('task_id')
            task_name = task.get('task_name')

            task_args = task.get('args')
            task_kwargs = task.get('kwargs')
            try:
                result_code, result_value = func(*task_args, **task_kwargs)
            except Exception as ex:
                #数据库记录任务
                record_task(db, task_name, task_id, result=ex, state=3)
            else:
                if result_code == 200:
                    record_task(db, task_name, task_id, result=result_value, state=1)
                else:
                    record_task(db, task_name, task_id, result=result_value, state=2)
            finally:
                _task_queue.task_done()
        except Queue.Empty:
            #状态位,通过视图函数实现线程间通信,关闭时置位
            if not work_state.running_state():
                break
        except Exception:
            log.error("Queue Error!", exc_info=True)

创建线程开始执行方法并返回进程号

def work_queue(*args, **kwargs):
    t = threading.Thread(target=_task_queue_consumer, args=args, kwargs=kwargs)
    t.start()
    _task_queue.join()
    return t

主函数与flask main一起执行

if __name__ == '__main__':
    work_queue(db_queue=db_queue)
    app.run(host="0.0.0.0", port=8989, debug=True, threaded=True)

优势:

1.单任务后台执行,完全不阻塞前台运行
2.任务获取可以伴随自定义的回调函数操作(上面的是数据库操作)

缺点:

1.单线程局限性,没有多并发功能

上一篇 下一篇

猜你喜欢

热点阅读