Python多线程整理
基础用法
- 1.用函数来包装(本质也是调用threading模块)
示例代码
# -*- coding: UTF-8 -*-
import threading
import time
# 为线程定义一个函数
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print "%s: %s" % ( threadName, time.ctime(time.time()) )
# 创建两个线程
try:
t1 = threading.Thread(target=print_time, args = ("Thread-1", 4, )) # 创建好了线程
t1.start() # 让线程跑起来(执行targetd处的函数)
t2 = threading.Thread(target=print_time, args = ("Thread-2", 4, )) # 创建好了线程
t2.start() # 让线程跑起来(执行targetd处的函数)
except:
print "Error: unable to start thread"
while 1:
pass
其他设置:
t1 = threading.Thread(target=print_time, args = ("Thread-1", 4, )) # 创建好了线程
t1.setDaemon(True) # 设置当主线程退出后,t1线程也强制退出,不再执行;默认为False;
t1.start() # 让t1线程跑起来;
# 阻塞主进程无法执行join以后的语句,专注执行该线程,必须等待线程执行完毕之后才能执行其后主线程的语句;
t1.join() # join可以设置时限
关于join和setDaemon的区别,可参考此文
- 2.用类来包装
使用Threading模块创建线程,直接从threading.Thread继承,然后重写init方法和run方法:
示例代码
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import time
exitFlag = 0
class myThread (threading.Thread): #继承父类threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self): #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threading.Thread.exit()
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1) # 之前的target函数(即实际要执行的函数)被至于了类中的run()方法中调用
thread2 = myThread(2, "Thread-2", 2)
# 开启线程
thread1.start()
thread2.start()
print "Exiting Main Thread"
多线程同步
- Lock & RLock:互斥锁 用来保证多线程访问共享变量的问题
- Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时拥有。
- Event对象: 它是线程间通信的方式,相当于信号,一个线程可以给另外一个线程发送信号后让其执行操作。
- Condition对象:其可以在某些事件触发或者达到特定的条件后才处理数据
Lock
Lock是最基础的锁,这里不多做解释
示例代码
# encoding: UTF-8
import threading
import time
data = 0
lock = threading.Lock()
def func():
global data
print '%s acquire lock...' % threading.currentThread().getName()
# 调用acquire([timeout])时,线程将一直阻塞,
# 直到获得锁定或者直到timeout秒后(timeout参数可选)。
# 返回是否获得锁。
if lock.acquire():
print '%s get the lock.' % threading.currentThread().getName()
data += 1
time.sleep(2)
print '%s release lock...' % threading.currentThread().getName()
# 调用release()将释放锁。
lock.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
Rlock
关于Rlock可以参照这篇文章,虽然代码看得明白,但是不懂什么时候要用到。
参考文章2
Semaphore(共享对象访问)
设定多个线程访问同一个对象;当semaphore次数用完,某个无法acquire的线程将会阻塞在semaphore.acquire()处,直到有锁释放
示例代码
# -*- coding: UTF-8 -*-
import threading
import time
semaphore = threading.Semaphore(3) # 设定3个线程访问同一个对象;当semaphore次数用完,某个无法acquire的线程将会阻塞在semaphore.acquire()处,直到有锁释放
def func():
if semaphore.acquire():
# for i in range(3):
time.sleep(1)
print (threading.currentThread().getName() + '获取锁\n')
semaphore.release()
print (threading.currentThread().getName() + ' 释放锁\n')
for i in range(5):
t1 = threading.Thread(target=func)
t1.start()
上段代码运行结果
在运行结果中可以看到,前三个线程都可以获取到锁,待前三个线程释放锁后,第4、5个线程才可以获取到锁。
Event
threading.Event 实现线程间通信
使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中</br>
也可以用来让主线程控制其他线程执行
Event内部包含了一个标志位,初始的时候为false。
可以使用使用set()来将其设置为true;
或者使用clear()将其从新设置为false;
可以使用is_set()来检查标志位的状态;
另一个最重要的函数就是wait(timeout=None),用来在wait语句处阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。
示例代码
# -*- coding: UTF-8 -*-
import threading
import time
class MyThread(threading.Thread):
def __init__(self, signal):
threading.Thread.__init__(self)
self.singal = signal
def run(self):
print " %s,等待event-signal ..."%self.name
self.singal.wait()
print "%s, 获取event-signal..." %self.name
if __name__ == "__main__":
singal = threading.Event()
for t in range(0, 3):
thread = MyThread(singal)
thread.start()
print "\nmain thread sleep 3 seconds...\n "
time.sleep(3)
singal.set() # signal被设置为true
print "\nsingal is True ... \n"
执行结果
由运行结果可发现,qevent标志位默认false,所以起初三个线程都在等待;
待主线程将标志位置为true后,三个线程都获得了signal,从而线程可以开始运行wait()语句后的内容
Condition锁
较之lock的锁,condition的锁在获得锁后,能够在条件变量,即这种机制是在满足了特定的条件后,线程才可以访问相关的数据,否则设置为wait
Condition.wait([timeout]):
线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。
wait()在必须在已获得Lock前提下才能调用在,否则会触发RuntimeError。
在调用wait()会释放Lock在,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock
Condition.notify():
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
Condition.notify_all()
Condition.notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐
示例代码
# -*- coding: UTF-8 -*-
from threading import Thread, Condition
import time
import random
queue = [1,2,3,4,5]
MAX_NUM = 5
condition = Condition()
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
print "Producer 获取锁...\n"
condition.acquire()
print "Producer 加锁...\n"
if len(queue) == MAX_NUM:
print "Queue full, producer is waiting"
condition.wait()
print "Space in queue, Consumer notified the producer"
num = random.choice(nums)
queue.append(num)
print "Produced", num
condition.notify()
condition.release()
print 'Producer 释放锁。。。\n'
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
print 'Consumer 获取锁...\n'
condition.acquire()
print "Consumer 加锁...\n"
if not queue:
print "Nothing in queue, consumer is waiting"
condition.wait()
print "Producer added something to queue and notified the consumer"
num = queue.pop(0)
print "Consumed", num
condition.notify()
condition.release()
print 'Consumer 释放锁。。。\n'
time.sleep(random.random())
ProducerThread().start()
ConsumerThread().start()
执行结果(部分)
如图可见,Producer获得锁后,由于队列以满,所以先wait(wait时会暂时释放锁)
然后Consumer获取到锁,并消费掉一个元素,并且notify正在wait的Producer线程
被唤醒的Producer生产一个元素(Produced 0),并释放锁
线程池threadpool
参考文章1--最简单使用
参考文章2--讲解内容再充裕的一篇
要提到的是,threadpool模块较老,根据参考文章2,pypi上也建议使用multiprocessing代替它。
# -*- coding: UTF-8 -*-
import threadpool
import time
def test_func(li):
print "数字是: " + str(li) + " \n"
# print "数字是: " + str(li[0]) + " " + str(li[1]) + " \n"
# Step1: 定义了一个线程池,表示最多可以创建poolsize这么多线程
pool = threadpool.ThreadPool(5)
# Step2: 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数.其中回调函数可以不写,default是无,也就是说makeRequests只需要2个参数就可以运行
# requests = threadpool.makeRequests(some_callable, list_of_args, callback)
requests = threadpool.makeRequests(test_func, ([9,3,4,5]))
# 注意,这里list_of_args相当于传递了4个请求的参数;传递请求的数量是由参数个数控制的。
# 同时传递多个参数的方法还没试出来,但是可以将多个参数合为一个传递([[9,3],[4,5]])
# Step3: 将所有要运行多线程的请求扔进线程池,一下两种写法都可以
[pool.putRequest(req) for req in requests]
# ---------------------or-------------------------
for req in requests:
print '---'
pool.putRequest(req)
# Step4:等待所有的线程完成工作后退出
pool.wait()
个人感觉:线程池用list_of_args实现了线程间的同步,起到了原来Queue的作用。认为在参数个数为1且多个线程运行同一个函数的情况下使用线程池。
2017.08.21