Python 期物之 concurrent.futures.Fu
2018-03-03 本文已影响24人
宝宝家的隔壁老王
Python 期物用在异步编程,所谓期物指的是排定的执行事件。Python 3.4起
-
Future 源码
class Future(object):
# 表征了异步计算的结果
def __init__(self):
# 初始化 future 实例,不应该通过用户端调用
self._condition = threading.Condition() # condition是条件锁
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
# 回调
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
# 格式化输出对象
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<%s at %#x state=%s raised %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<%s at %#x state=%s returned %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<%s at %#x state=%s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
# 取消期物的调用,取消成功返回 Ture,其余返回 False。
# 如果期物已经运行或者已经结束,则该期物不可以被取消,返回 True。
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
# 唤醒所有使用 _condition 条件阻塞的线程
self._condition.notify_all()
# 执行任务结束或cancel的回调
self._invoke_callbacks()
return True
def cancelled(self):
# 如果 future 已被 cancel,返回 True
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def running(self):
# 如果 future 正在运行,返回 True
with self._condition:
return self._state == RUNNING
def done(self):
# 如果 future 已被 cancel 或者 执行结束,返回 True
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
# 返回期物运行结果
def __get_result(self):
if self._exception:
raise self._exception
else:
return self._result
def add_done_callback(self, fn):
# 期物运行结束调用的对象
# fn: 期物运行结束或 cancel 后被调用,总会在所添加的进程内调用。如果期物已经结束或 cancel 则会立即调用;根据添加顺序进行调用
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
def result(self, timeout=None):
"""
Returns:
期物的运行结果
Raises:
CanceledError: 期物被 cancell
TimeoutError: 期物在给定的时间没有执行完毕
Exception: 其他 Error
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
# 此处会阻塞,等待 notify
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def exception(self, timeout=None):
"""
Returns:
期物运行的异常
Raises:
CancelledError: 如果期物被 cancel
TimeoutError: 如果期物在给定的时间没有执行完毕
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
"""
标记期物为 RUNNING 或者 CANCELLED_AND_NOTIFIED,
1、如果期物已经 cancelled 则期物任何等待执行的线程都会被 notify 并且 return False。
2、如果期物没有被 cancelled,则状态变更为 RUNNING,返回 True
3、此方法应该在期物所关联的work执行前被调用,如果此方法返回 False,那么 work 不应该被执行。
Returns:
如果期物已经被 cancelled,返回 False;其他情况返回 True
Raises:
RuntimeError:如果此方法已经被调用或者 set_result() 或者 set_exception()被调用。
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
"""
将期物关联 work 的返回值赋值给期物对象,并发送通知 notify
"""
with self._condition:
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""
使用给定的异常设置期物的 _exception 值
"""
with self._condition:
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
-
单从 Future 类并无法获知期物何时运行,下面引入 ThreadPoolExecutor
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""
初始化一个 ThreadPoolExecutor 实例
Args: max_workers 使用最大线程数
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.Queue() # _WorkItem 实例队列
self._threads = set() # 实例的线程数
self._shutdown = False # 设置为 True 不再接受事件提交
self._shutdown_lock = threading.Lock() # 锁
# 事件提交
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs) # 用以在线程中调用其 run 方法
self._work_queue.put(w)
self._adjust_thread_count() # 用以开启最多 _max_workers 数量的线程,并且在每个线程中 while 循环执行 _work_queue 队列中的实例
return f # 返回期物
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# 用以唤醒 worker 线程
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
-
Executor
class Executor(object):
# 异步调用的抽象基类
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""
Returns:
迭代器,等同于 map(fn, *iteravles) 但是不是按照顺序执行
Args:
fn: 可调用对象,参数在 iterable 对象中
timeout: 最大等待时间
Raises:
TimeoutError: 所有的迭代器不能在给定的时间生成
Exception: 任何其他异常错误
"""
if timeout is not None:
end_time = timeout + time.time()
# submit 的作用是将 函数+期物 绑定生成_WorkItem 实例对象,并且创建线程去循环执行 _WorkItem 对象实例
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
def shutdown(self, wait=True):
# 清理所有关联 executor 对象的资源
pass
def __enter__(self):
# return 的 self 是给 as 使用的
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
-
_WorkItem
# 简单的工作类
class _WorkItem(object):
# 初始化,参数为 期物+函数+参数
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
# 标记期物为notify,非 True 直接返回。调用期物关联的fn方法。
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
-
_worker()
# _worker方法
def _worker(executor_reference, work_queue):
"""
此方法在被调用的线程内 while True 执行
"""
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
"""
1、编译器是否关闭
2、executor 是否被回收
3、executor._shutdown 被设置
"""
if _shutdown or executor is None or executor._shutdown:
# 通知其他线程的 worker
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
备注
期物的使用标准流程
with futures.ThreadPoolExecutor(10) as executor:
res = executor.map(func, para_data_list) # func 是要调用的函数,para_data_list 是参数 list
- 分析
- 使用 .map() 方法内部会调用 .submit() 方法;
- .submit() 方法是将函数 + 期物绑定生成 _WorkItem 实例对象,并且创建线程循环执行 _WorkItem 对象实例;
- _worker() 方法是使用线程池调用,该线程池共享同一份 _work_queue 队列
- 返回值 res 是生成器,使用迭代获取函数返回的值;
- future.result() 会阻塞调用。