随笔-生活工作点滴

threading库源码浅析系列之Condition

2019-07-07  本文已影响0人  莫斯克内斯

背景

最近项目中遇到一个需求,有多个机器学习算法都需要独占一块gpu,如果多个算法同时共用一个gpu可能显存不够用导致程序崩溃。但发货出去的服务器往往只有1~2块gpu,我需要实现一个调度中心,某算法在计算前,先向调度中心申请gpu资源,计算完成后再向调度中心释放资源,使资源可供其它算法使用。

为了保证多进程信息的一致性,这个调度系统必须加锁,进程间通讯我使用的是rabbitmq,使用python中的threading标准库及Queue对象,完成了一个调度系统雏形(玩具版)。在使用Queue对象时,发现它通过threading库中的接口实现了线程安全。之前有看过threading库,但一直没来得及进行总结。后面我将陆续对threading库中的接口源码进行浅析,来印证这次学习。由于本人才疏学浅,有误或不足之处,希望大家指点。

示例

先贴一个Condition使用示例,简单的生产者消费者模型。

# coding:utf8
import threading
import logging
FORMAT = '%(asctime)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


class Consumer:

    def __init__(self, condition):
        self.condition = condition
        self.event = threading.Event()

    def consume(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                self.condition.wait()
                count += 1
                logging.info('I consume a bread, please produce.count: {}'.format(count))
                self.condition.notify()


class Productor:

    def __init__(self, condition):
        self.condition = condition
        self.event = threading.Event()

    def produce(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                count += 1
                logging.info('I produce a bread, please consume.count: {}'.format(count))
                self.event.wait(1)
                self.condition.notify()
                self.condition.wait()


if __name__ == '__main__':
    condition = threading.Condition()
    productor = Productor(condition)
    consumer = Consumer(condition)
    t1 = threading.Thread(target=productor.produce, args=(1,), name='productor')
    t2 = threading.Thread(target=consumer.consume, args=(2,), name='consumer')
    t2.start()
    t1.start()

输出结果

2019-07-07 15:58:09,981 productor I produce a bread, please consume. count: 1
2019-07-07 15:58:10,987 consumer I consume a bread, please produce. count: 1
2019-07-07 15:58:13,996 productor I produce a bread, please consume. count: 2
2019-07-07 15:58:15,001 consumer I consume a bread, please produce. count: 2
2019-07-07 15:58:18,008 productor I produce a bread, please consume. count: 3
2019-07-07 15:58:19,010 consumer I consume a bread, please produce. count: 3

源码分析

上面代码不用多说,比较简单。主要分析下Condition源码。

  1. 首先看下初始化方法
 def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()
  1. Condition对象支持上下文管理,如下方法分别为获取、释放全局锁(大锁),
    判断当前线程是否拥有全局锁。
    def __enter__(self):
        return self._lock.__enter__()

    def __exit__(self, *args):
        return self._lock.__exit__(*args)

    def _release_save(self):
        self._lock.release()           # No state to save

    def _acquire_restore(self, x):
        self._lock.acquire()           # Ignore saved state

    def _is_owned(self):
        # Return True if lock is owned by current_thread.
        # This method is called only if _lock doesn't have _is_owned().
        if self._lock.acquire(0):
            self._lock.release()
            return False
        else:
            return True
  1. 下面我们来分析两个接口方法wait、notify
    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass
    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

启动顺序

  1. 在使用Condition时会有线程启动顺序的问题,如果调换示例中t1(生产者)、t2(生产者)两线程的启动顺序,先启动生产线程后启动消费线程,输出如下:
2019-07-07 23:01:48,137 productor I produce a bread, please consume. count: 1
  1. 发现只有一条生产线程的log信息,消费者并没有去消费。
    看一下生产者的produce方法,分析下原因
def produce(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                count += 1
                logging.info('I produce a bread, please consume.count: {}'.format(count))
                self.event.wait(1)
                self.condition.notify()
                self.condition.wait()
    def consume(self, interval):
        count = 0
        with self.condition:
            while not self.event.wait(interval):
                self.condition.wait()
                count += 1
                logging.info('I consume a bread, please produce. count: {}'.format(count))
                self.condition.notify()
  1. 之前我第一次使用Condition时还没来得及源码,也是被这个启动顺序困扰了一下。全篇写的比较啰嗦,希望大家不要嫌弃。

本文版权归作者 莫斯克内斯(博客地址:https://www.jianshu.com/p/512f045b2259
)所有,欢迎转载和商用,请在文章页面明显位置给出原文链接并保留此段声明,否则保留追究法律责任的权利,其他事项,可留言咨询。

上一篇 下一篇

猜你喜欢

热点阅读