02Python学习笔记之二.九【线程】2019-08-24
章节号 | 内容 |
---|---|
1图片格式(png) | 宽度大于620px,保持高宽比减低为620px |
12 | 123 |
1-1-1 | 方法 |
1-1-1 | 方法 |
第1章节 线程
-
1-1 线程—多线程执行(直接使用threading.Thread)
导入threading模块即可。
1 import threading
2 import time
3
4 def pri():
5 print("thread run")
6 time.sleep(1)
7
8 for i in range(5):
9 t = threading.Thread(target=pri)
10 t.start()
li@li-ThinkPad-T420s:~$ python3 15.py
thread run
thread run
thread run
thread run
thread run
线程也是多任务的一种方式。
程序就是代码。进程是开始执行的代码,它拥有部分系统分配给它的资源。
每个进程都有一个主线程。
主线程会等待子线程结束才退出。
-
1-2 线程—使用Thread子类完成多线程
子线程执行同一个函数,互相不会干扰。但是通常不同线程执行的是不同函数。
import threading
import time
class mythr(threading.Thread):
def run(self):
for i in range(5):
time.sleep(1)
#self.name当前线程的名字
msg = self.name+","+str(i)
print(msg)
t=mythr()
t.start()
Thread-6,0
Thread-6,1
Thread-6,2
Thread-6,3
Thread-6,4
子结束,父未回收,则子成为僵尸
父结束,子还存活,则子成为孤儿
li@li-System-Product-Name:~$ ps -aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.0 0.0 225720 9480 ? Ss 15:28 0:01 /sbin/init splash
root 2 0.0 0.0 0 0 ? S 15:28 0:00 [kthreadd]
root 4 0.0 0.0 0 0 ? I< 15:28 0:00 [kworker/0:0H]
root 6 0.0 0.0 0 0 ? I< 15:28 0:00 [mm_percpu_wq]
root 7 0.0 0.0 0 0 ? S 15:28 0:00 [ksoftirqd/0]
root 8 0.0 0.0 0 0 ? I 15:28 0:00 [rcu_sched]
root 9 0.0 0.0 0 0 ? I 15:28 0:00 [rcu_bh]
root 10 0.0 0.0 0 0 ? S 15:28 0:00 [migration/0]
root 11 0.0 0.0 0 0 ? S 15:28 0:00 [watchdog/0]
root 12 0.0 0.0 0 0 ? S 15:28 0:00 [cpuhp/0]
root 13 0.0 0.0 0 0 ? S 15:28 0:00 [cpuhp/1]
root 14 0.0 0.0 0 0 ? S 15:28 0:00 [watchdog/1]
root 15 0.0 0.0 0 0 ? S 15:28 0:00 [migration/1]
root 16 0.0 0.0 0 0 ? S 15:28 0:00 [ksoftirqd/1]
root 17 0.0 0.0 0 0 ? I 15:28 0:00 [kworker/1:0]
root 18 0.0 0.0 0 0 ? I< 15:28 0:00 [kworker/1:0H]
root 19 0.0 0.0 0 0 ? S 15:28 0:00 [cpuhp/2]
root 20 0.0 0.0 0 0 ? S 15:28 0:00 [watchdog/2]
root 21 0.0 0.0 0 0 ? S 15:28 0:00 [migration/2]
root 22 0.0 0.0 0 0 ? S 15:28 0:00 [ksoftirqd/2]
root 23 0.0 0.0 0 0 ? I 15:28 0:00 [kworker/2:0]
root 24 0.0 0.0 0 0 ? I< 15:28 0:00 [kworker/2:0H]
root 25 0.0 0.0 0 0 ? S 15:28 0:00 [cpuhp/3]
.
.
.
PID为0的进程,负责进程间调度。
PID为1的进程,负责间接或简介创建其他的进程。
PID为1的进程,处理所有孤儿
进程。
-
1-3 线程—线程的执行顺序
主进程和子进程执行顺序是不一定的,由操作系统的调度来决定。
import threading
import time
class mythr(threading.Thread):
def run(self):
for i in range(5):
time.sleep(1)
msg = self.name+","+str(i)
print(msg)
def test():
#生成5个线程,都从run()开始执行
for i in range(5):
print("test"+str(i))
t = mythr()
t.start()
test()
test0
test1
test2
test3
test4
Thread-6,0
Thread-7,0
Thread-8,0
Thread-9,0
Thread-10,0
Thread-6,1
Thread-7,1
Thread-8,1
Thread-9,1
Thread-10,1
Thread-6,2
Thread-7,2
Thread-8,2
Thread-10,2
Thread-9,2
Thread-6,3
Thread-7,3
Thread-8,3
Thread-10,3
Thread-9,3
Thread-6,4
Thread-7,4
Thread-10,4
Thread-8,4
Thread-9,4
第一次执行有顺序(创建有顺序),但是一旦sleep后,先后就不确定了。
-
1-4 线程—线程共享全局变量
import threading
import time
g_num=0
def w1():
global g_num
for i in range(3):
g_num+=1
print("w1 num is %d"%g_num)
def w2():
global g_num
print("w2 num is %d"%g_num)
print("before thread,num is %d"%g_num)
t1=threading.Thread(target=w1)
t1.start()
#延时保证t1执行完毕
time.sleep(1)
t2=threading.Thread(target=w2)
t2.start()
before thread,num is 0
w1 num is 3
w2 num is 3
↑这里说明一个问题,线程间,全局变量是共用的。
上述代码是刻意避免了多个线程同时访问一个变量,因为这样会导致一些问题。
↑
进
程间,数据和代码均分用,互不干涉。↑
线
程间,数据公用,代码分用,数据可以共享。利弊各自参半。线程共享的问题如下↓
1 from threading import Thread
2
3 g_num = 0
4
5 def test1():
6 global g_num
7 for i in range(1000000):
8 g_num += 1
9 print("test1 %d" % g_num)
10
11
12 def test2():
13 global g_num
14 for i in range(1000000):
15 g_num += 1
16 print("test2 %d" % g_num)
17
18
19 t1 = Thread(target=test1)
20 t1.start()
21
22 t2 = Thread(target=test2)
23 t2.start()
24
25 print("final %d"%g_num)
li@li-ThinkPad-T420s:~$ python3 16.py
test1 1213667
final 1260000
test2 1283779
↑两个线程,各自对同一个全局变量进行自加1至100万,最后结果按道理来说,应该是200万。实际结果如上。
原因分析
: g_num += 1是关键。这里在视觉上是一行代码,但是CPU实际调度过程中却至少需要2个单位时间来运行,首先是执行g_num + 1,然后再执行g_num=g_num + 1。如果CPU在连续的两个时间片,执行的都是不同线程的加法操作或者赋值操作,那么实际运行结果就会和预想产生出入。
↓我们稍微修改一下代码,让t1线程有足够的时间来运行。看结果如何:
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("test1 %d" % g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("test2 %d" % g_num)
t1 = Thread(target=test1)
t1.start()
#让主线程等t1线程3秒钟
time.sleep(3)
t2 = Thread(target=test2)
t2.start()
print("final %d"%g_num)
li@li-System-Product-Name:~$ python3 1.py
test1 1000000
final 1051813
test2 2000000
↑可以看到计算的记过已经符合预期了,但是最后打印的final的值为什么是这个呢?因为final是由主线程执行的,而主线程和t2线程是同时执行的,相当于两条腿同时跑。那如何才能把final 的输出也变成预期的值↓
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("test1 %d" % g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("test2 %d" % g_num)
t1 = Thread(target=test1)
t1.start()
time.sleep(3)
t2 = Thread(target=test2)
t2.start()
#让主线程等t2线程也加完毕
time.sleep(3)
print("final %d"%g_num)
li@li-System-Product-Name:~$ python3 1.py
test1 1000000
test2 2000000
final 2000000
-
1-5 线程—列表当做参数传递到线程处理函数中
from threading import Thread
import time
nums = [11, 22, 33]
def test1(nums):
nums.append(44)
print("test1 nums", nums)
def test2(nums):
#等待t1进行添加44的操作,确保它执行完毕
time.sleep(1)
print("test2 get nums", nums)
t1 = Thread(target=test1, args=(nums,))
t1.start()
t2 = Thread(target=test2, args=(nums,))
t2.start()
print("final nums", nums)
li@li-System-Product-Name:~$ python3 2.py
test1 nums [11, 22, 33, 44]
final nums [11, 22, 33, 44]
test2 get nums [11, 22, 33, 44]
↑列表是可变数据类型,可以直接进行引用修改。
-
1-6 线程—避免全局变量被错误的使用
上述的示例代码中,因为不同的线程并行执行,对全局变量产生了不符合预期的结果。关键就是一句核心的代码g_num += 1
被拆分执行了。
那么我们应该如何避免这种问题的发生?应该有那么几种方式:
1、让各个线程在时间上串行执行(最笨,也是背离多任务初衷的方式,就是上述的sleep)
2、让一个线程不断的等另一个线程执行完毕(轮询)
3、让关键语句g_num += 1
强制不拆分执行(加互斥锁)
解决:2、让一个线程不断的等(这个等是耗费资源的等)另一个线程执行完毕,效率低下↓
首先
设置一个全部的标志g_f,在两个线程中分别对这个标志进行判断,不管标志为什么值,肯定有一个线程是不满足条件而不运行的。
然后
在不满足条件的线程中,使用while死循环来查看标志g_f是否符合自己的运行条件,是就运行,不是就继续循环。记住要设置条件跳出while死循环。
from threading import Thread
import time
g_num = 0
g_f =0
def test1():
global g_num
global g_f
if g_f == 0:
for i in range(1000000):
g_num += 1
g_f=1
print("test1 %d" % g_num)
def test2():
global g_num
global g_f
#不停的判断,为轮询
while True:
if g_f==1:
for i in range(1000000):
g_num += 1
#加完直接退出循环
break
print("test2 %d" % g_num)
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
print("final %d"%g_num)
可以看到,全程都未用sleep函数
final 110183
test1 1000000
test2 2000000
-
1-7 线程—互斥锁
上述的例子中,用while True的方式等待,是很不经济的做法。有没有什么好的解决方式?有的,那就是互斥锁。
使用方式
:导入Lock库,用lo=Lock()创建锁,在多个线程的核心代码前后加入lo.acquire()和lo.release(),一旦有一个线程上锁成功,那么其他的锁都会堵塞(一直等待)到这个锁解开为止,期间不占CPU。
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
#两个线程都抢上锁,因为你不知道哪个线程先执行,但是只要某一个线程成功上锁,另一个线程就会在上锁处卡住等待这个锁解开
lo.acquire()
for i in range(1000000):
g_num += 1
#两个线程都要解锁
lo.release()
print("test1 %d" % g_num)
def test2():
global g_num
#两个线程都抢上锁,因为你不知道哪个线程先执行,但是只要某一个线程成功上锁,另一个线程就会在上锁处卡住等待这个锁解开
lo.acquire()
for i in range(1000000):
g_num += 1
#两个线程都要解锁
lo.release()
print("test2 %d" % g_num)
#创建一把锁,这把锁只能上一次,一旦被锁就不能锁第二次
lo=Lock()
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
print("final %d"%g_num)
test1 1000000
test2 2000000
final 215425
-
1-7 线程—几个问题
上面的例子中,有这么一个问题:我们加了锁,让一个线程从头到位执行完100万次加法,才解锁,也就是说我们把一个多任务人为写成了单任务,这是违背多任务的初衷的,那么如何解决这个问题呢?那就是把加锁解锁放到for循环内部去。这也是一个上锁的原则:上锁的代码部分要最小化
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
lo.acquire()
g_num += 1
lo.release()
print("test1 %d" % g_num)
def test2():
global g_num
for i in range(1000000):
lo.acquire()
g_num += 1
lo.release()
print("test2 %d" % g_num)
#创建一把锁,这把锁只能上一次,一旦被锁就不能锁第二次
lo=Lock()
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
print("final %d"%g_num)
final 24696
test1 1993796
test2 2000000
↑但是这也有取舍的问题,在运行过程中,明显感觉运行速度比上锁在for循环外慢,个人觉得这是频繁的上锁解锁造成的系统额外开销。
解锁后相当于是通知其他锁,所以造成系统开销不大
。
-
1-7 线程—使用非共享的变量
不修改全局变量,不加锁。
from threading import Thread, Lock
import time
def test1():
num =100
num+=1
print("test1 %d" % num,end="\n")
time.sleep(1)
print("test1 %d" % num,end="\n")
def test2():
num =100
print("test2 %d" % num)
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
test1 101
test2 100
test1 101
↑上例代码说明:不同的线程执行不同的函数,修改局部变量,结果是互不影响的。那么,如果两个线程,执行的是同一个函数呢??????
from threading import Thread, Lock
#导入threading是为了使用threading.current_thread().name
import threading
import time
def test1():
print("now thread is %s"%threading.current_thread().name,end="**")
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test1)
t2.start()
now thread is Thread-7now thread is Thread-6****li@li-System-Product-Name:~/Documents$
↑首先我们把代码改成如上所示,先测试在当前的环境下,线程的名字各是什么,方便之后使用,这里看到了是Thread-6和Thread-7(这个因电脑环境而异,不一定相同),下面我们改写代码:
from threading import Thread, Lock
#导入threading是为了使用threading.current_thread().name
import threading
import time
def test1():
print("now thread is %s ##"%threading.current_thread().name,end="**")
num =100
if threading.current_thread().name == "Thread-6":
num+=1
else:
time.sleep(2)
print("now thread is %s,num is %d"%(threading.current_thread().name,num))
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test1)
t2.start()
now thread is Thread-6 ##**now thread is Thread-7 ##now thread is Thread-6,num is 101**
now thread is Thread-7,num is 100
↑首先两个线程都进入test1()函数执行,其中Thread-6进行加1操作,然后打印num值,Thread-7直接睡2秒,再打印num值,可以看到,各自打印的num值不同。
小tip,注意“**”的位置,来分析线程和end=执行的关系。
这得出了一个结论,线程对局部变量的修改,也是各自独有的,并不共享。
总结:
1、多进程中,全局和局部的变量,都是独享的。
2、多线程中,全局变量共享,局部变量独享。
所以,线程的局部变量,不用加锁,即便是多线程运行同一个函数。
-
1-8 线程—死锁
我等着你解锁,你等着我解锁,就产生了死锁。
import threading
import time
class thr1(threading.Thread):
def run(self):
if lo1.acquire():
print(self.name+"is start")
time.sleep(1)
if lo2.acquire():
print(self.name+"end")
lo2.release()
lo1.release()
class thr2(threading.Thread):
def run(self):
if lo2.acquire():
print(self.name+"is start")
time.sleep(1)
if lo1.acquire():
print(self.name+"end")
lo1.release()
lo2.release()
#产生2把锁
lo1=threading.Lock()
lo2=threading.Lock()
t1=thr1()
t2=thr2()
t1.start()
t2.start()
如何防止死锁呢?
1、编写程序的时候使用银行家算法!
2、添加超时的时间!
lo.acquire()有个参数,如果blocking为True,则会堵塞。如果blocking为False,则不堵塞。
lo.acquire([blocking])
In [4]: ?a.acquire
Docstring:
acquire(blocking=True, timeout=-1) -> bool
(acquire_lock() is an obsolete synonym)
Lock the lock. Without argument, this blocks if the lock is already
locked (even by the same thread), waiting for another thread to release
the lock, and return True once the lock is acquired.
With an argument, this will only block if the argument is true,
and the return value reflects whether the lock is acquired.
The blocking operation is interruptible.
Type: builtin_function_or_method
timeout=-1表示永不休止。
下面我们加入超时时间,代码如下:
import threading
import time
class thr1(threading.Thread):
def run(self):
if lo1.acquire():
print(self.name+"is start1")
time.sleep(1)
if lo2.acquire(timeout=3):
print(self.name+"end")
lo2.release()
lo1.release()
class thr2(threading.Thread):
def run(self):
if lo2.acquire():
print(self.name+"is start")
time.sleep(1)
if lo1.acquire(timeout=3):
print(self.name+"end")
lo1.release()
lo2.release()
#产生2把锁
lo1=threading.Lock()
lo2=threading.Lock()
t1=thr1()
t2=thr2()
t1.start()
t2.start()
li@li-System-Product-Name:~$ python3 3.py
Thread-1is start1
Thread-2is start
Thread-1end
li@li-System-Product-Name:~$ python3 3.py
Thread-1is start1
Thread-2is start
Thread-2end
li@li-System-Product-Name:~$ python3 3.py
Thread-1is start1
Thread-2is start
li@li-System-Product-Name:~$
↑可以看到,多次运行的结果是不一样的,因为多线程调度后,lo1和lo2谁先被解锁是不一定的。注意到还有一个end都没有的情况。思考为什么会有这样的情况。
-
1-9 线程—同步
同步就是协同步调,按照预定的先后或次序运行。
异步就是谁先执行不确定。
from threading import Thread, Lock
import time
class thr1(Thread):
def run(self):
while True:
if lo1.acquire():
print("thr1")
time.sleep(0.5)
lo2.release()
class thr2(Thread):
def run(self):
while True:
if lo2.acquire():
print("thr2")
time.sleep(0.5)
lo3.release()
class thr3(Thread):
def run(self):
while True:
if lo3.acquire():
print("thr3")
time.sleep(0.5)
lo1.release()
lo1=Lock()
lo2=Lock()
lo2.acquire()
lo3=Lock()
lo3.acquire()
t1=thr1()
t2=thr2()
t3=thr3()
t1.start()
t2.start()
t3.start()
thr1
thr2
thr3
thr1
thr2
thr3
thr1
thr2
thr3
↑三个线程中,循环上解锁!这些都是前人的思想结晶,要好好吸收消化。
有规律,这里就是同步~
-
1-10 线程—解决耦合
生产者与消费者模式:
from queue import Queue
import threading
import time
class Ser(threading.Thread):
def run(self):
global queue
count =0
while True:
if queue.qsize()<1000:
for i in range(100):
count=count+1
msg = self.name+"made stuff"+str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Xer(threading.Thread):
def run(self):
global queue
count =0
while True:
if queue.qsize()>100:
for i in range(3):
count=count+1
msg = self.name+" use "+queue.get()
print(msg)
time.sleep(1)
queue=Queue()
for i in range(500):
print("init stuff"+str(i))
queue.put("init stuff"+str(i))
for i in range(2):
p=Ser()
p.start()
for i in range(5):
x=Xer()
x.start()
结果请自行运行查看。
在编程过程中,要解决生成和处理之间速度不统一的问题,可以使用队列queue来完成。纵观很多设计模式,都会使用第三者来解耦。
或者可以用读写文件
方式来完成。
创业公司,买一个虚拟服务器,完成所有功能。但是规模扩大后,要把功能分散到多个服务器上,多个服务器间还要相互通信,这时候就体现出你程序的解耦性能了。
-
1-11 线程—ThreadLocal
↑↓