实现简单的 协程异步并发池

2020-01-16  本文已影响0人  JZ莫问
# -*- coding:utf-8 -*-
'''asyncio 学习'''
import aiohttp
import asyncio
from threading import Thread
import time,os,random
class myasync(object):
    def __init__(self):
        self.pool = 1000
        self.new_loop = asyncio.new_event_loop()
        self.thread = Thread(target=self.start_loop,args=(self.new_loop,)) # 创建新的线程
        # 它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。也就是说守护线程不依赖于终端,但是依赖于系统,与系统“同生共死”。
        # setDaemon(True): 当主线程退出时,后台线程随机退出;
        # setDaemon(False)(默认情况): 当主线程退出时,若前台线程还未结束,则等待所有线程结束,相当于在程序末尾加入join().
        self.thread.setDaemon(True) # 守护进程
        self.thread.start() # 开始进程
        self.tasks = []


    def start_loop(self,loop):
        while True:
            try:
                asyncio.set_event_loop(loop)
                loop.run_forever()
            except Exception as EX:
                print(repr(EX),' <==')
            else:
                break

    # 进入异步中
    def add(self,fun):
        while True:
            if self.jc():
                task = asyncio.run_coroutine_threadsafe(fun, self.new_loop)
                self.tasks.append(task)
                return "OK"
            else:
                time.sleep(2)

    # 检测tasks是否还有
    def jc(self,):
        while True:
            if len(self.tasks) < self.pool:
                return True
            for n,task in enumerate(self.tasks):
                if task.done():
                    # print(task) # <Future at 0x27348f2f9b0 state=finished returned int> 运行完成的
                    del self.tasks[n]
                    return True
            else:
                return False


    # 检测是否运行完成
    def join(self,):
        while True:
            for task in self.tasks:
                # print(dir(task))
                # print(task.result()) # 运行结果?_? # 添加时进行运行操作 return 结果 进行阻塞?_?
                # xx = task.result()
                # print(task.exception()) # 返回Task的异常?_? # 添加时进行运行操作
                # print(task.done()) # 运行完成 # 检测task是否运行完成
                # print(task.add_done_callback(self.hd)) # 添加完成的回调????
                # print(task.cancel) # 取消 <bound method Future.cancel of <Future at 0x268d6776a90 state=pending>>
                # print(task.cancelled) # 取消 <bound method Future.cancelled of <Future at 0x204f92b5cc0 state=pending>>
                # print(task.running()) # False 是否运行?
                # 没有运行完成的继续运行运行完成才break
                if not task.done():
                    break
            else:
                print(task.exception(),'Error')
                return "OK"
            time.sleep(1)

    def __del__(self):
        self.new_loop.close()

async def gg(xnum):
    xx = random.randint(1,10)
    print('time:',xx,'key:',xnum)
    await asyncio.sleep(xx)
    print('OK ====>> :',xx,xnum)
    path = os.getcwd() + '\\test\\'
    with open(path+str(xnum)+'.txt','ab+') as f:
        f.write(str(xnum).encode('utf-8'))
        f.write('\n'.encode('utf-8'))

# Pending -> Pending:Runing -> Finished

if __name__ == '__main__':
    t2 = time.time()
    xx = myasync()
    for x in range(10000):
        xx.add(gg(x))
    xx.join()
    print(time.time() - t2)

特别感谢白大佬

上一篇 下一篇

猜你喜欢

热点阅读