DjangoPython

Python 将async函数转为基于同一个EventLoop实

2022-10-23  本文已影响0人  slords

背景:

  1. 主体业务使用的是基于async函数的异步处理的框架;
  2. 连接池等资源基于EventLoop进行缓存,复用和调用;
  3. 需要Celery进行后台任务,目前版本Celery对于async并不能良好支持,需要把async转为sync;
  4. 如果每次生成一个新的EventLoop实例会导致连接池等资源无法得到重用。

目标:

其他:

代码:

import asyncio
import functools
import threading
from typing import Any, Optional

# 设置全局的EventLoop
LOOP = asyncio.get_event_loop()


class CallResult:

    result: Any = None
    exception: Optional[BaseException] = None


# async_to_sync 装饰器
def async_to_sync(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        call_result = CallResult()
        event = threading.Event()  # 用于阻塞等待运行结果

        async def wrapper2():
            try:
                call_result.result = await func(*args, **kwargs)
            except BaseException as e:
                call_result.exception = e  # 写入异常
            finally:
                event.set()

        # 使用 全局EventLoop将wrapper2以task的方式执行
        LOOP.call_soon_threadsafe(LOOP.create_task, wrapper2())
        event.wait()  # 等待event激活 返回结果
        if call_result.exception:
            raise call_result.exception
        return call_result.result

    return wrapper


# 正常的异步函数
@async_to_sync
async def go():
    print('current_loop: ', id(asyncio.get_event_loop()))
    print('GLOBAL LOOP: ', id(LOOP))
    print('current_loop is GLOBAL LOOP: ', LOOP is asyncio.get_event_loop())


# 抛出异常的异步函数
@async_to_sync
async def raise_value_error():
    raise ValueError(id(LOOP))

# 下述loop_thread, start_loop, stop_loop可整合为一个类,这里为了方便阅读写成函数调用
loop_thread: threading


# 将全局EventLoop设为运行状态
def start_loop():
    global loop_thread
    loop_thread = threading.Thread(target=LOOP.run_forever)
    loop_thread.start()


# 将全局EventLoop关闭,正常的服务可以不用写,这个是为了示例代码可以正常结束用的
def stop_loop():

    @async_to_sync
    async def stop():
        print('Loop stop')
        LOOP.stop()

    stop()
    print('Loop close')
    LOOP.close()
    loop_thread.join()


if __name__ == '__main__':
    start_loop()
    for i in range(10):
        print(f'------------{i:02}------------')
        go()
    print('[run go] end')
    try:
        raise_value_error()
    except ValueError:
        import traceback
        # 使用标准输出,确保内容输出顺序一致
        print(traceback.format_exc())
        print('[raise_value_error] end')

    print('stop loop')
    stop_loop()
    print('stop loop called', flush=True)

输出如下:

------------00------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------01------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------02------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------03------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------04------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------05------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------06------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------07------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------08------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------09------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
[run go] end
Traceback (most recent call last):
  File "<PythonFile>", line 86, in <module>
    raise_value_error()
  File "<PythonFile>", line 35, in wrapper
    raise call_result.exception
  File "<PythonFile>", line 25, in wrapper2
    call_result.result = await func(*args, **kwargs)
  File "<PythonFile>", line 52, in raise_value_error
    raise ValueError(id(LOOP))
ValueError: 2224110182984

[raise_value_error] end
stop loop
Loop stop
Loop close
stop loop called
上一篇下一篇

猜你喜欢

热点阅读