2 线程与python
1 全局解释器锁(GIL)
python代码执行是在python虚拟机上进行控制的,在主循环中同时只能有一个控制线程在执行,就像单核CPU系统中的多进程一样。python解释器可以运行多个线程,但在任意时刻只会有一个线程会被解释器执行。
对python虚拟机的访问是由全局解释器锁(GIL)进行控制的。这个锁保证同时只能有一个线程运行。在多线程环境下,python虚拟机将按以下方式执行:
1 设置GIL
2 切换进一个进程去执行
3 执行下面操作之一:
a 指定数量的字节码指令
b 线程主动让出控制权(可调用time.sleep(0)来完成)
4 把多线程设置回睡眠状态(切换出线程)
5 解锁GIL
6 重复以上动作。
2 threading是对thread的封装。
1、开启线程:
线程有2种调用方式,如下:
直接调用
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
if __name__ == '__main__':
t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
t1.start() #启动线程
t2.start() #启动另一个线程
print(t1.getName()) #获取线程名
print(t2.getName())
继承式调用
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定义每个线程要运行的函数
print("running on number:%s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
2、主进程下开启子进程:
t=multiprocessing.Process(target=work)
t.start()
程序会先执行主程序的语句,再执行此子进程的目标函数work();
3、t.setDadmon()
设置守护进程;必须在start()
之前设置;如果为True则主程序不用等此线程结束后再结束主程序;
from threading import Thread, current_thread
import time
def foo():
print(123)
time.sleep(5)
print('end123')
def bar():
print(456)
time.sleep(2)
print('end456') # 守护线程:非守护线程代码运行结束后就结束
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
# t1.join() #让主线程原地等待子线程运行完毕后才运行下面代码
print('main----')
t.join() 等待线程结束;
t.isAlive() 返回线程是否活动
t.getName() 返回线程名。
t.setName() 设置线程名。
threading.currentThread() 返回当前线程变量;
threading.enumerate() 返回一个包含正在运行线程的列表;
threading.activeCount() 返回正在运行的线程数量;
threading.Semaphore(5) 限制最大连接数为5,semaphore是一个acquire,release的计数器;
多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析
4
4、同步锁
R=threading.Lock()
R.acquire()
'''
对公共数据的操作
'''
R.release()
import threading, time
def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
用于对共享资源同步访问的限制,只有当一个线程访问完毕后另一个线程才能访问。
5、信号量
信号量是一个计数器,当资源消耗时递减,当资源释放时递增。信号量比锁更灵活因为可以有多个线程,每个线程拥有有限资源的一个实例。
from atexit import register
from random import randrange
from threading import BoundedSemaphore, Lock, Thread
from time import sleep, ctime
lock = Lock()
MAX = 5
candytray = BoundedSemaphore(MAX)
def refill():
lock.acquire()
print('Refilling candy...', end=' ')
try:
candytray.release()
except ValueError:
print('full, skipping')
else:
print('OK')
lock.release()
def buy():
lock.acquire()
print('Buying candy...', end=' ')
if candytray.acquire(False):
print('OK')
else:
print('empty, skipping')
lock.release()
def producer(loops):
for i in range(loops):
refill()
sleep(randrange(3))
def consumer(loops):
for i in range(loops):
buy()
sleep(randrange(3))
def _main():
print('starting at:', ctime())
nloops = randrange(2, 6)
print('THE CANDY MACHINE (full with %d bars)!' % MAX)
Thread(target=consumer, args=(randrange(
nloops, nloops+MAX+2),)).start() # buyer
Thread(target=producer, args=(nloops,)).start() # vendor
@register
def _atexit():
print('all DONE at:', ctime())
if __name__ == '__main__':
_main()
6 生产消费问题和Queue/queue模块
from random import randrange
from time import sleep
from queue import Queue
from myThread3 import MyThread
def writeQ(queue):
print('producing object for Q...', end='')
queue.put('xxx', 1)
print("size now", queue.qsize())
def readQ(queue):
val = queue.get(1)
print('consumed object from Q... size now', queue.qsize())
def writer(queue, loops):
for i in range(loops):
writeQ(queue)
sleep(randrange(1, 4))
def reader(queue, loops):
for i in range(loops):
readQ(queue)
sleep(randrange(2, 6))
funcs = [writer, reader]
nfuncs = range(len(funcs))
def main():
nloops = randrange(2, 6)
q = Queue(32)
threads = []
for i in nfuncs:
t = MyThread(funcs[i], (q, nloops), \
funcs[i].__name__)
threads.append(t)
for i in nfuncs:
threads[i].start()
for i in nfuncs:
threads[i].join()
print('all DONE')
if __name__ == '__main__':
main()