python中threading库详解及实例
python中threading库
另一篇文章:pthreading库详解
threading模块
threading模块除了Thread类之外,好包括其他很多的同步机制,下面来看一下threading模块汇总所包含的对象。
对象描述
Thread执行线程的对象
Lock锁对象
RLock递归锁,是一个线程可以再次拥有已持有的锁对象
Condition条件变量对象,使一个线程等待另一个线程满足特定的条件触发
Event事件对象,普通版的Condition
Semaphore信号量,为线程间共享的资源提供一个“计数器”,计数开始值为设置的值,默认为1
BoundedSemaphore与Semaphore相同,有边界,不能超过设置的值
Timer定时运行的线程对象,定时器
Barrier界限,当达到某一界限后才可以继续执行
看到threading有这么多对象,是不是有些懵了,下面一个个的来看一下
Thread类
Thread类是threading模块的主要主要执行对象。Thread对象有三个数据属性,name(线程名)、ident(线程的标识)、daemon(布尔值,是否是守护线程)。这三个数据属性可以直接通过对象进行调用并进行设置。
守护线程一般是一个等待客户端请求服务的服务器,进程退出时,该线程在正常情况下不会退出
Thread类还有一些对象方法
对象方法描述
__init__()实例化一个线程对象
start()开始执行线程
run()定义线程功能方法(一般在子类中进行重写)
join(timeout=None)直至启动的线程终止或timeout秒,否则一直挂起,多用于主线程进行阻塞等待子线程运行完毕。
isAlivel/is_alive()线程是否存活
注意
__init__()完整函数如下__init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None),Thread对象实例化需要一个可调用的target(可以是一个函数,也可是一个可调用的类实例),参数args或者kwargs。
说了这么多,那怎么创建线程呢?一般有两种方法:
创建Thread实例,传给其一个函数或可调用的类实例
派生Thread的子类,并创建子类的实例
一般来说,创建Thread实例并传递一个函数和派生Thread子类比较常用,后者更符合面向对象且比较容易扩展
创建Thread实例,传给它一个函数
importrandomimportthreadingfromtimeimportctime,sleepdefloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime()) sleep(nsec) print('end loop ',nloop,' done at:',ctime())defmain():print('starting at:',ctime()) threads = []foriinrange(3): t = threading.Thread(target=loop,args=(i,random.randint(1,5))) threads.append(t)foriinrange(3): threads[i].start()foriinrange(3): threads[i].join() print('all done at:',ctime())if__name__ =='__main__': main()
传递给Thread实例一个函数其实和thread模块中差不多,这里随机生成3个Thread实例,分别运行随机事件,然后通过循环让线程启动threads[i].start(),然后通过join()让主线程等待结束。
打印结果如下
startingat:ThuSep7 17:53:442017startloop0sec: 1at:ThuSep7 17:53:442017startloop1sec: 5at:ThuSep7 17:53:442017startloop2sec: 3at:ThuSep7 17:53:442017endloop0doneat:ThuSep7 17:53:452017endloop2doneat:ThuSep7 17:53:472017endloop1doneat:ThuSep7 17:53:492017alldoneat:ThuSep7 17:53:492017
如果将join代码注释掉的话,主线程将不会等待子线程运行,打印结果如下:
startingat:ThuSep7 17:56:112017startloop0sec: 2at:ThuSep7 17:56:112017startloop1sec: 4at:ThuSep7 17:56:112017startloop2sec: 5at:ThuSep7 17:56:112017alldoneat:ThuSep7 17:56:112017endloop0doneat:ThuSep7 17:56:132017endloop1doneat:ThuSep7 17:56:152017endloop2doneat:ThuSep7 17:56:162017
可以看到all done语句已经先执行完毕。然后各个子线程仍然在运行直到结束。
创建Thread实例,传给它一个可调用类的实例
这里需要解释一下,一般实现了__call__方法的类,其实例可以像函数一样进行调用(其实函数就是可调用的对象),称之为可调用类的实例。这样的话,只需要在类中实现__call__方法即可
importrandomimportthreadingfromtimeimportctime, sleepclassThreadFunc(object):def__init__(self,func,args,name=''):self.name = name self.func = func self.args = argsdef__call__(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime()) sleep(nsec) print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime()) threads = [] loops = range(3)foriinloops: t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__)) threads.append(t)foriinloops: threads[i].start()foriinloops: threads[i].join() print('all done at:',ctime())if__name__ =='__main__': main()
这个例子和上面的例子相同,只不过在实例化Thread的时候将ThreadFunc传递给target,当t调用start的时候,其会调用__call__方法。
看一下运行的结果:
importrandomimportthreadingfromtimeimportctime, sleepclassThreadFunc(object):def__init__(self,func,args,name=''):self.name = name self.func = func self.args = argsdef__call__(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime()) sleep(nsec) print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime()) threads = [] loops = range(3)foriinloops: t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__)) threads.append(t)foriinloops: threads[i].start()foriinloops: threads[i].join() print('all done at:',ctime())if__name__ =='__main__': main()
派生Thread的子类,创建子类的实例。
派生Thread的子类,一般需要重写run方法。
importrandomfromthreadingimportThreadfromatexitimportregisterfromtimeimportctime, sleepclassThreadFunc(Thread):def__init__(self,func,args):Thread.__init__(self) self.func = func self.args = argsdefrun(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime()) sleep(nsec) print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime()) threads = [] loops = range(3)foriinloops: t = ThreadFunc(loop,(i,random.randint(1,5))) threads.append(t)foriinloops: threads[i].start()@registerdef_atexit():print('all done at:',ctime())if__name__ =='__main__': main()
需要注意的是,子类的构造函数必须先调用基类的构造函数,在基类的构造函数中对于相应的参数进行了设置。这里并没有用join来控制主线程等待子线程完成,而是使用atexit.register()来注册一个退出函数。来看一下结果:
startat:ThuSep7 18:33:222017startloop0sec: 4at:ThuSep7 18:33:222017startloop1sec: 3at:ThuSep7 18:33:222017startloop2sec: 4at:ThuSep7 18:33:222017endloop1at:ThuSep7 18:33:252017endloop0at:ThuSep7 18:33:262017endloop2at:ThuSep7 18:33:262017alldoneat:ThuSep7 18:33:262017
以上就是多线程的三种实现方式。
同步原语
一般在多线程代码中,一般有一些特定的函数或代码块不希望被多个线程同时执行,这就需要使用同步了。同步原语中,锁是最简单、最低级的机制,而信号量通常用于多线程竞争有限资源的情况。
锁(Lock)
锁只有两种状态,锁定和解锁。即支持两种操作:获得锁和释放锁。当多线程争夺锁的时候,允许第一个获得锁的线程进入临界区,并执行代码,其他后到达的线程将被阻塞,直到获得锁的线程执行完毕,退出临界区,并释放锁。其他等待的线程去争夺锁并进行临界区。
下面来看一下不适用锁的例子。
importrandomfromatexitimportregisterfromthreadingimportcurrentThread, Threadfromtimeimportctime, sleepremaining = []defloop(nsec):tname = currentThread().name remaining.append(tname) print('{0} started at:{1}'.format(tname,ctime())) sleep(nsec) remaining.pop() print('{0} Completed at:{1} used {2}'.format(tname,ctime(),nsec)) print(remaining)defmain():foriinrange(4): Thread(target=loop,args=(random.randint(1,5),)).start()@registerdef_axexit():print('all down at:',ctime())if__name__ =='__main__': main()
上面的程序很简单,一个共享的列表remaining来存储剩余的线程,在每个线程中先将线程名称添加到该列表,运行完成后,则从列表中删除。下面来看一下运行的结果:
Thread-1startedat:ThuSep7 22:11:012017Thread-2startedat:ThuSep7 22:11:012017Thread-3startedat:ThuSep7 22:11:012017Thread-4startedat:ThuSep7 22:11:012017Thread-2Completedat:ThuSep7 22:11:022017usedtime1Thread-3Completedat:ThuSep7 22:11:022017usedtime1['Thread-1', 'Thread-2']['Thread-1', 'Thread-2']Thread-4Completedat:ThuSep7 22:11:032017usedtime2['Thread-1']Thread-1Completedat:ThuSep7 22:11:042017usedtime3[]alldownat:ThuSep7 22:11:042017
看上面的输出,Thread-2已经执行完毕,下面的remainlist却仍还有Thread-2,而Thread-4正在执行,却没有。这说明多个线程并行执行IO程序,同时有多个程序修改同一个变量导致值输出问题。这个时候需要用锁来防止多个线程同时修改共享数据。
修改代码如下:
importrandomfromatexitimportregisterfromthreadingimportcurrentThread, Thread, Lockfromtimeimportctime, sleepremaining = []lock = Lock()defloop(nsec):tname = currentThread().name lock.acquire() remaining.append(tname) print('{0} started at:{1}'.format(tname,ctime())) lock.release() sleep(nsec) lock.acquire() remaining.pop() print('{0} Completed at:{1} used time {2}'.format(tname,ctime(),nsec)) print(remaining) lock.release()defmain():foriinrange(4): Thread(target=loop,args=(random.randint(1,5),)).start()@registerdef_axexit():print('all down at:',ctime())if__name__ =='__main__': main()
大部分代码都没有改动,只是在添加了全局变量lock,然后在remaining添加和删除线程名称的时候进行锁的获取和释放,来看一下运行的结果。
Thread-1startedat:ThuSep7 22:26:402017Thread-2startedat:ThuSep7 22:26:402017Thread-3startedat:ThuSep7 22:26:402017Thread-4startedat:ThuSep7 22:26:402017Thread-1Completedat:ThuSep7 22:26:412017usedtime1['Thread-1', 'Thread-2', 'Thread-3']Thread-4Completedat:ThuSep7 22:26:422017usedtime2['Thread-1', 'Thread-2']Thread-2Completedat:ThuSep7 22:26:422017usedtime2['Thread-1']Thread-3Completedat:ThuSep7 22:26:442017usedtime4[]alldownat:ThuSep7 22:26:442017
这样看结果是不是正常了。
可能你会觉得lock这样进行锁的获取和释放代码不太直观,你也可以使用with语句,如下:
withlock: remaining.append(tname) print('{0} started at:{1}'.format(tname,ctime()))
这样看起来是不是简洁多了,缩进的代码会自动进行加锁和释放锁的功能。
信号量
信号量其实是一个计数器,当资源消耗时递减,当资源释放时递增。资源的消耗与释放称为一次PV操作,P()源于probeer/proberen,也称为wait、try、acquire、pend或procure,是消耗资源使计数器递减的操作。V()源于verhogen/verhoog,也成为signal、increment、release、post、vacate,是释放资源,使其回到资源池的操作。不过在python中被固定为acquire和release操作。
下面通过生产者-消费者问题来了解一下信号量
场景:生产者生产产品,消费者消费产品,柜台上最多只能摆放5件产品
fromatexitimportregisterfromrandomimportrandrangefromthreadingimportLock, Thread, BoundedSemaphorefromtimeimportsleep, ctimelock = Lock()MAX =5product_on_shelf = BoundedSemaphore(MAX)defproduct():withlock: print('product is doing')try: product_on_shelf.release()exceptValueError: print('shelf is full')else: print('product is done')defconsume():withlock: print('product is consume')ifproduct_on_shelf.acquire(): print('product is successed consume')else: print('shelf is empty')defproducer(loops):print('producer is start for loops:',loops)foriinrange(loops): product() sleep(randrange(3))defcustomer(loops):print('customer is start for loops:', loops)foriinrange(loops): consume() sleep(randrange(3))defmain():print('starting at:',ctime()) nloops = randrange(3,6) Thread(target=producer,args=(nloops,)).start() Thread(target=customer,args=(nloops,)).start()@registerdef_atexit():print('all done at:',ctime())if__name__ =='__main__': main()
这里定义了生产者和消费者两个线程,随机循环生产和消费产品。product和consume是生产产品和消费产品的函数,均通过锁进行同步,这里使用的是BoundedSemaphore,主要是因为其有边界,值为Max,初始状态下信号量的值为5。这里需要对release进行异常捕获,超过边界BoundedSemaphore会抛出ValueError异常,而acquire则返回boolean来表示获取是否成功。下面来看一下结果:
starting at: Fri Sep811:34:462017producerisstartforloops:3productisdoingshelfisfullcustomerisstartforloops:3productisconsumeproductissuccessed consumeproductisconsumeproductissuccessed consumeproductisdoingproductisdoneproductisconsumeproductissuccessed consumeproductisdoingproductisdoneall done at: Fri Sep811:34:502017
可以看到运行结果正常。
threading模块是对thread模块的封装,并添加了状态管理,并且通过lock扩展出来的Condition进行了多线程的通过管理,所以一般在实际生产中主要使用threading模块去处理问题。
threading模块暂时就讲到这里。