python中threading库详解及实例

2018-01-07  本文已影响0人  咫尺天涯var

python中threading库

另一篇文章:pthreading库详解

threading模块

threading模块除了Thread类之外,好包括其他很多的同步机制,下面来看一下threading模块汇总所包含的对象。

对象描述

Thread执行线程的对象

Lock锁对象

RLock递归锁,是一个线程可以再次拥有已持有的锁对象

Condition条件变量对象,使一个线程等待另一个线程满足特定的条件触发

Event事件对象,普通版的Condition

Semaphore信号量,为线程间共享的资源提供一个“计数器”,计数开始值为设置的值,默认为1

BoundedSemaphore与Semaphore相同,有边界,不能超过设置的值

Timer定时运行的线程对象,定时器

Barrier界限,当达到某一界限后才可以继续执行

看到threading有这么多对象,是不是有些懵了,下面一个个的来看一下

Thread类

Thread类是threading模块的主要主要执行对象。Thread对象有三个数据属性,name(线程名)、ident(线程的标识)、daemon(布尔值,是否是守护线程)。这三个数据属性可以直接通过对象进行调用并进行设置。

守护线程一般是一个等待客户端请求服务的服务器,进程退出时,该线程在正常情况下不会退出

Thread类还有一些对象方法

对象方法描述

__init__()实例化一个线程对象

start()开始执行线程

run()定义线程功能方法(一般在子类中进行重写)

join(timeout=None)直至启动的线程终止或timeout秒,否则一直挂起,多用于主线程进行阻塞等待子线程运行完毕。

isAlivel/is_alive()线程是否存活

注意

__init__()完整函数如下__init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None),Thread对象实例化需要一个可调用的target(可以是一个函数,也可是一个可调用的类实例),参数args或者kwargs。

说了这么多,那怎么创建线程呢?一般有两种方法:

创建Thread实例,传给其一个函数或可调用的类实例

派生Thread的子类,并创建子类的实例

一般来说,创建Thread实例并传递一个函数和派生Thread子类比较常用,后者更符合面向对象且比较容易扩展

创建Thread实例,传给它一个函数

importrandomimportthreadingfromtimeimportctime,sleepdefloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' done at:',ctime())defmain():print('starting at:',ctime())    threads = []foriinrange(3):        t = threading.Thread(target=loop,args=(i,random.randint(1,5)))        threads.append(t)foriinrange(3):        threads[i].start()foriinrange(3):        threads[i].join()    print('all done at:',ctime())if__name__ =='__main__':    main()

传递给Thread实例一个函数其实和thread模块中差不多,这里随机生成3个Thread实例,分别运行随机事件,然后通过循环让线程启动threads[i].start(),然后通过join()让主线程等待结束。

打印结果如下

startingat:ThuSep7 17:53:442017startloop0sec: 1at:ThuSep7 17:53:442017startloop1sec: 5at:ThuSep7 17:53:442017startloop2sec: 3at:ThuSep7 17:53:442017endloop0doneat:ThuSep7 17:53:452017endloop2doneat:ThuSep7 17:53:472017endloop1doneat:ThuSep7 17:53:492017alldoneat:ThuSep7 17:53:492017

如果将join代码注释掉的话,主线程将不会等待子线程运行,打印结果如下:

startingat:ThuSep7 17:56:112017startloop0sec: 2at:ThuSep7 17:56:112017startloop1sec: 4at:ThuSep7 17:56:112017startloop2sec: 5at:ThuSep7 17:56:112017alldoneat:ThuSep7 17:56:112017endloop0doneat:ThuSep7 17:56:132017endloop1doneat:ThuSep7 17:56:152017endloop2doneat:ThuSep7 17:56:162017

可以看到all done语句已经先执行完毕。然后各个子线程仍然在运行直到结束。

创建Thread实例,传给它一个可调用类的实例

这里需要解释一下,一般实现了__call__方法的类,其实例可以像函数一样进行调用(其实函数就是可调用的对象),称之为可调用类的实例。这样的话,只需要在类中实现__call__方法即可

importrandomimportthreadingfromtimeimportctime, sleepclassThreadFunc(object):def__init__(self,func,args,name=''):self.name = name        self.func = func        self.args = argsdef__call__(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime())    threads = []    loops = range(3)foriinloops:        t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__))        threads.append(t)foriinloops:        threads[i].start()foriinloops:        threads[i].join()    print('all done at:',ctime())if__name__ =='__main__':    main()

这个例子和上面的例子相同,只不过在实例化Thread的时候将ThreadFunc传递给target,当t调用start的时候,其会调用__call__方法。

看一下运行的结果:

importrandomimportthreadingfromtimeimportctime, sleepclassThreadFunc(object):def__init__(self,func,args,name=''):self.name = name        self.func = func        self.args = argsdef__call__(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime())    threads = []    loops = range(3)foriinloops:        t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__))        threads.append(t)foriinloops:        threads[i].start()foriinloops:        threads[i].join()    print('all done at:',ctime())if__name__ =='__main__':    main()

派生Thread的子类,创建子类的实例。

派生Thread的子类,一般需要重写run方法。

importrandomfromthreadingimportThreadfromatexitimportregisterfromtimeimportctime, sleepclassThreadFunc(Thread):def__init__(self,func,args):Thread.__init__(self)        self.func = func        self.args = argsdefrun(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime())    threads = []    loops = range(3)foriinloops:        t = ThreadFunc(loop,(i,random.randint(1,5)))        threads.append(t)foriinloops:        threads[i].start()@registerdef_atexit():print('all done at:',ctime())if__name__ =='__main__':    main()

需要注意的是,子类的构造函数必须先调用基类的构造函数,在基类的构造函数中对于相应的参数进行了设置。这里并没有用join来控制主线程等待子线程完成,而是使用atexit.register()来注册一个退出函数。来看一下结果:

startat:ThuSep7 18:33:222017startloop0sec: 4at:ThuSep7 18:33:222017startloop1sec: 3at:ThuSep7 18:33:222017startloop2sec: 4at:ThuSep7 18:33:222017endloop1at:ThuSep7 18:33:252017endloop0at:ThuSep7 18:33:262017endloop2at:ThuSep7 18:33:262017alldoneat:ThuSep7 18:33:262017

以上就是多线程的三种实现方式。

同步原语

一般在多线程代码中,一般有一些特定的函数或代码块不希望被多个线程同时执行,这就需要使用同步了。同步原语中,锁是最简单、最低级的机制,而信号量通常用于多线程竞争有限资源的情况。

锁(Lock)

锁只有两种状态,锁定和解锁。即支持两种操作:获得锁和释放锁。当多线程争夺锁的时候,允许第一个获得锁的线程进入临界区,并执行代码,其他后到达的线程将被阻塞,直到获得锁的线程执行完毕,退出临界区,并释放锁。其他等待的线程去争夺锁并进行临界区。

下面来看一下不适用锁的例子。

importrandomfromatexitimportregisterfromthreadingimportcurrentThread, Threadfromtimeimportctime, sleepremaining = []defloop(nsec):tname = currentThread().name    remaining.append(tname)    print('{0} started at:{1}'.format(tname,ctime()))    sleep(nsec)    remaining.pop()    print('{0} Completed at:{1} used {2}'.format(tname,ctime(),nsec))    print(remaining)defmain():foriinrange(4):        Thread(target=loop,args=(random.randint(1,5),)).start()@registerdef_axexit():print('all down at:',ctime())if__name__ =='__main__':    main()

上面的程序很简单,一个共享的列表remaining来存储剩余的线程,在每个线程中先将线程名称添加到该列表,运行完成后,则从列表中删除。下面来看一下运行的结果:

Thread-1startedat:ThuSep7 22:11:012017Thread-2startedat:ThuSep7 22:11:012017Thread-3startedat:ThuSep7 22:11:012017Thread-4startedat:ThuSep7 22:11:012017Thread-2Completedat:ThuSep7 22:11:022017usedtime1Thread-3Completedat:ThuSep7 22:11:022017usedtime1['Thread-1', 'Thread-2']['Thread-1', 'Thread-2']Thread-4Completedat:ThuSep7 22:11:032017usedtime2['Thread-1']Thread-1Completedat:ThuSep7 22:11:042017usedtime3[]alldownat:ThuSep7 22:11:042017

看上面的输出,Thread-2已经执行完毕,下面的remainlist却仍还有Thread-2,而Thread-4正在执行,却没有。这说明多个线程并行执行IO程序,同时有多个程序修改同一个变量导致值输出问题。这个时候需要用锁来防止多个线程同时修改共享数据。

修改代码如下:

importrandomfromatexitimportregisterfromthreadingimportcurrentThread, Thread, Lockfromtimeimportctime, sleepremaining = []lock = Lock()defloop(nsec):tname = currentThread().name    lock.acquire()    remaining.append(tname)    print('{0} started at:{1}'.format(tname,ctime()))    lock.release()    sleep(nsec)    lock.acquire()    remaining.pop()    print('{0} Completed at:{1} used time {2}'.format(tname,ctime(),nsec))    print(remaining)    lock.release()defmain():foriinrange(4):        Thread(target=loop,args=(random.randint(1,5),)).start()@registerdef_axexit():print('all down at:',ctime())if__name__ =='__main__':    main()

大部分代码都没有改动,只是在添加了全局变量lock,然后在remaining添加和删除线程名称的时候进行锁的获取和释放,来看一下运行的结果。

Thread-1startedat:ThuSep7 22:26:402017Thread-2startedat:ThuSep7 22:26:402017Thread-3startedat:ThuSep7 22:26:402017Thread-4startedat:ThuSep7 22:26:402017Thread-1Completedat:ThuSep7 22:26:412017usedtime1['Thread-1', 'Thread-2', 'Thread-3']Thread-4Completedat:ThuSep7 22:26:422017usedtime2['Thread-1', 'Thread-2']Thread-2Completedat:ThuSep7 22:26:422017usedtime2['Thread-1']Thread-3Completedat:ThuSep7 22:26:442017usedtime4[]alldownat:ThuSep7 22:26:442017

这样看结果是不是正常了。

可能你会觉得lock这样进行锁的获取和释放代码不太直观,你也可以使用with语句,如下:

withlock:    remaining.append(tname)    print('{0} started at:{1}'.format(tname,ctime()))

这样看起来是不是简洁多了,缩进的代码会自动进行加锁和释放锁的功能。

信号量

信号量其实是一个计数器,当资源消耗时递减,当资源释放时递增。资源的消耗与释放称为一次PV操作,P()源于probeer/proberen,也称为wait、try、acquire、pend或procure,是消耗资源使计数器递减的操作。V()源于verhogen/verhoog,也成为signal、increment、release、post、vacate,是释放资源,使其回到资源池的操作。不过在python中被固定为acquire和release操作。

下面通过生产者-消费者问题来了解一下信号量

场景:生产者生产产品,消费者消费产品,柜台上最多只能摆放5件产品

fromatexitimportregisterfromrandomimportrandrangefromthreadingimportLock, Thread, BoundedSemaphorefromtimeimportsleep, ctimelock = Lock()MAX =5product_on_shelf = BoundedSemaphore(MAX)defproduct():withlock:        print('product is doing')try:            product_on_shelf.release()exceptValueError:            print('shelf is full')else:            print('product is done')defconsume():withlock:        print('product is consume')ifproduct_on_shelf.acquire():            print('product is successed consume')else:            print('shelf is empty')defproducer(loops):print('producer is start for loops:',loops)foriinrange(loops):        product()        sleep(randrange(3))defcustomer(loops):print('customer is start for loops:', loops)foriinrange(loops):        consume()        sleep(randrange(3))defmain():print('starting at:',ctime())    nloops = randrange(3,6)    Thread(target=producer,args=(nloops,)).start()    Thread(target=customer,args=(nloops,)).start()@registerdef_atexit():print('all done at:',ctime())if__name__ =='__main__':    main()

这里定义了生产者和消费者两个线程,随机循环生产和消费产品。product和consume是生产产品和消费产品的函数,均通过锁进行同步,这里使用的是BoundedSemaphore,主要是因为其有边界,值为Max,初始状态下信号量的值为5。这里需要对release进行异常捕获,超过边界BoundedSemaphore会抛出ValueError异常,而acquire则返回boolean来表示获取是否成功。下面来看一下结果:

starting at: Fri Sep811:34:462017producerisstartforloops:3productisdoingshelfisfullcustomerisstartforloops:3productisconsumeproductissuccessed consumeproductisconsumeproductissuccessed consumeproductisdoingproductisdoneproductisconsumeproductissuccessed consumeproductisdoingproductisdoneall done at: Fri Sep811:34:502017

可以看到运行结果正常。

threading模块是对thread模块的封装,并添加了状态管理,并且通过lock扩展出来的Condition进行了多线程的通过管理,所以一般在实际生产中主要使用threading模块去处理问题。

threading模块暂时就讲到这里。

上一篇下一篇

猜你喜欢

热点阅读