Python多线程整理

2017-08-21  本文已影响0人  御风而行carrie

基础用法

示例代码

# -*- coding: UTF-8 -*-
import threading
import time
 
# 为线程定义一个函数
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print "%s: %s" % ( threadName, time.ctime(time.time()) )
 
# 创建两个线程
try:
   t1 = threading.Thread(target=print_time, args = ("Thread-1", 4, )) # 创建好了线程
   t1.start()  # 让线程跑起来(执行targetd处的函数)
   t2 = threading.Thread(target=print_time, args = ("Thread-2", 4, )) # 创建好了线程
   t2.start() # 让线程跑起来(执行targetd处的函数)
except:
   print "Error: unable to start thread"
 
while 1:
   pass
其他设置:
t1 = threading.Thread(target=print_time, args = ("Thread-1", 4, )) # 创建好了线程

t1.setDaemon(True) # 设置当主线程退出后,t1线程也强制退出,不再执行;默认为False;

t1.start() # 让t1线程跑起来;

# 阻塞主进程无法执行join以后的语句,专注执行该线程,必须等待线程执行完毕之后才能执行其后主线程的语句;
t1.join()  # join可以设置时限

关于join和setDaemon的区别,可参考此文

示例代码


#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
import threading
import time
 
exitFlag = 0
 
class myThread (threading.Thread):   #继承父类threading.Thread
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):                   #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 
        print "Starting " + self.name
        print_time(self.name, self.counter, 5)
        print "Exiting " + self.name
 
def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            threading.Thread.exit()
        time.sleep(delay)
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1
 
# 创建新线程
thread1 = myThread(1, "Thread-1", 1) # 之前的target函数(即实际要执行的函数)被至于了类中的run()方法中调用
thread2 = myThread(2, "Thread-2", 2)
 
# 开启线程
thread1.start()
thread2.start()
 
print "Exiting Main Thread"

多线程同步

参考文章1

Lock

Lock是最基础的锁,这里不多做解释

示例代码
# encoding: UTF-8
import threading
import time
 
data = 0
lock = threading.Lock()
 
def func():
    global data
    print '%s acquire lock...' % threading.currentThread().getName()
    
    # 调用acquire([timeout])时,线程将一直阻塞,
    # 直到获得锁定或者直到timeout秒后(timeout参数可选)。
    # 返回是否获得锁。
    if lock.acquire():
        print '%s get the lock.' % threading.currentThread().getName()
        data += 1
        time.sleep(2)
        print '%s release lock...' % threading.currentThread().getName()
        
        # 调用release()将释放锁。
        lock.release()
 
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()

Rlock

关于Rlock可以参照这篇文章,虽然代码看得明白,但是不懂什么时候要用到。
参考文章2

Semaphore(共享对象访问)

设定多个线程访问同一个对象;当semaphore次数用完,某个无法acquire的线程将会阻塞在semaphore.acquire()处,直到有锁释放

示例代码
# -*- coding: UTF-8 -*-
import threading
import time

semaphore = threading.Semaphore(3) # 设定3个线程访问同一个对象;当semaphore次数用完,某个无法acquire的线程将会阻塞在semaphore.acquire()处,直到有锁释放

def func():
    if semaphore.acquire():
        # for i in range(3):
        time.sleep(1)
        print (threading.currentThread().getName() + '获取锁\n')
        semaphore.release()
        print (threading.currentThread().getName() + ' 释放锁\n')



for i in range(5):
  t1 = threading.Thread(target=func)
  t1.start()
  
上段代码运行结果

在运行结果中可以看到,前三个线程都可以获取到锁,待前三个线程释放锁后,第4、5个线程才可以获取到锁。

Event

threading.Event 实现线程间通信
使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中</br>
也可以用来让主线程控制其他线程执行

Event内部包含了一个标志位,初始的时候为false。
可以使用使用set()来将其设置为true;
或者使用clear()将其从新设置为false;
可以使用is_set()来检查标志位的状态;
另一个最重要的函数就是wait(timeout=None),用来在wait语句处阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

示例代码
# -*- coding: UTF-8 -*-
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, signal):
        threading.Thread.__init__(self)
        self.singal = signal

    def run(self):
        print " %s,等待event-signal ..."%self.name
        self.singal.wait()
        print "%s, 获取event-signal..." %self.name

if __name__ == "__main__":
    singal = threading.Event()
    for t in range(0, 3):
        thread = MyThread(singal)
        thread.start()

    print "\nmain thread sleep 3 seconds...\n "
    time.sleep(3)

    singal.set() # signal被设置为true
    print "\nsingal is True ... \n"
    
执行结果

由运行结果可发现,qevent标志位默认false,所以起初三个线程都在等待;
待主线程将标志位置为true后,三个线程都获得了signal,从而线程可以开始运行wait()语句后的内容

Condition锁

较之lock的锁,condition的锁在获得锁后,能够在条件变量,即这种机制是在满足了特定的条件后,线程才可以访问相关的数据,否则设置为wait

Condition.wait([timeout]):
线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。
wait()在必须在已获得Lock前提下才能调用在,否则会触发RuntimeError。
调用wait()会释放Lock在,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock

Condition.notify():
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

Condition.notify_all()
Condition.notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐

示例代码
# -*- coding: UTF-8 -*-
from threading import Thread, Condition
import time
import random

queue = [1,2,3,4,5]
MAX_NUM = 5
condition = Condition()

class ProducerThread(Thread):
    def run(self):
        nums = range(5)
        global queue
        while True:
            print "Producer 获取锁...\n"
            condition.acquire()
            print "Producer 加锁...\n"
            if len(queue) == MAX_NUM:
                print "Queue full, producer is waiting"
                condition.wait()
                print "Space in queue, Consumer notified the producer"
            num = random.choice(nums)
            queue.append(num)
            print "Produced", num
            condition.notify()
            condition.release()
            print 'Producer 释放锁。。。\n'
            time.sleep(random.random())


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            print 'Consumer 获取锁...\n'
            condition.acquire()
            print "Consumer 加锁...\n"
            if not queue:
                print "Nothing in queue, consumer is waiting"
                condition.wait()
                print "Producer added something to queue and notified the consumer"
            num = queue.pop(0)
            print "Consumed", num
            condition.notify()
            condition.release()
            print 'Consumer 释放锁。。。\n'
            time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()

执行结果(部分)

如图可见,Producer获得锁后,由于队列以满,所以先wait(wait时会暂时释放锁)
然后Consumer获取到锁,并消费掉一个元素,并且notify正在wait的Producer线程
被唤醒的Producer生产一个元素(Produced 0),并释放锁

线程池threadpool

参考文章1--最简单使用
参考文章2--讲解内容再充裕的一篇

要提到的是,threadpool模块较老,根据参考文章2,pypi上也建议使用multiprocessing代替它。

# -*- coding: UTF-8 -*-
import threadpool
import time

def test_func(li):
    print "数字是: " + str(li) + " \n"
    # print "数字是: " + str(li[0]) + " " + str(li[1]) + " \n"

# Step1: 定义了一个线程池,表示最多可以创建poolsize这么多线程
pool = threadpool.ThreadPool(5)

# Step2: 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数.其中回调函数可以不写,default是无,也就是说makeRequests只需要2个参数就可以运行
# requests = threadpool.makeRequests(some_callable, list_of_args, callback)

requests = threadpool.makeRequests(test_func, ([9,3,4,5]))
# 注意,这里list_of_args相当于传递了4个请求的参数;传递请求的数量是由参数个数控制的。
# 同时传递多个参数的方法还没试出来,但是可以将多个参数合为一个传递([[9,3],[4,5]])

# Step3: 将所有要运行多线程的请求扔进线程池,一下两种写法都可以
[pool.putRequest(req) for req in requests]
# ---------------------or-------------------------
for req in requests:
    print '---'
    pool.putRequest(req)

# Step4:等待所有的线程完成工作后退出
pool.wait()

个人感觉:线程池用list_of_args实现了线程间的同步,起到了原来Queue的作用。认为在参数个数为1且多个线程运行同一个函数的情况下使用线程池。

2017.08.21

上一篇下一篇

猜你喜欢

热点阅读