Python 线程学习

2018-07-10  本文已影响0人  vckah

Python 有关线程的模块主要有两个,一个是 thread,另一个是 Threadingthread 更趋向于底层,官方建议能力高的人去操作这个模块,对于我等初学者来说,threading 模块已经够我们折腾一阵子了。threading 模块与 Process 模块差不多,只是从结构上来说,用法有点类似。
其实 Python 的多线程有点鸡肋,准确地说,是 CPython 的多线程有点鸡肋。在 Python 诞生的那个年代,哪有什么多线程技术,一个 CPU 就够搞了,用不到什么特别高深的技术,所以这也算是个历史遗留问题吧。Python 有一个 全局解释器锁机制,就是俗称的 GIL。每个线程在执行的过程中都需要先获取 GIL。简单来说,就是 Python 将所有线程都加了一把锁,保证在同一时刻只能有一条线程去操纵数据。释放 GIL 情况:在 IO 操作等可能会引起阻塞的系统调用之前,可以释放 GIL。所以,Python 在调用一些 C 代码写的程序时,解释器都会释放 GIL。

from threading import Thread

def func(n):
    pass
t = Thread(target=func, args=(,))
t.start()
t.join()

# 还有类实现的
class MyThread(Thread):
    def __init__(slef, )
        super().__init__()
        pass
    
    def run(self):
        pass

实现一个 socket 服务端:

import socket
from threading import Thread

def chat(conn):
    con.send(b'hellow')
    msg = conn.recv(1024).decode('utf-8')
    print(msg)
    conn.close()

sk = socket.socket()
sk.bind(('127.0.0.1', 8000))
sk.listen()
while True:
    conn, addr = sk.accept()
    Thread(target=chat, args=(conn, )).start()
sk.close()
from threading import Thread, Lock

def func(lock):
    global n
    lock.acquire()
    n -= 1
    lock.release()
n = 10
f_lst = []
lock = Lock()
for i in range(10):
    t = Thread(target=func, args=(lock, ))
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
print(n)

注意,以上代码并不能确保最后一定能输出 0,因为假设有一个线程取到了数据,在准备向回赋值的时候,突然时间片轮转到了,导致这个线程暂停,另一个线程取到了数据。但是第一个线程还没有写数据,所以数据会污染。解决的办法是加锁即可。

# 再来看一个哲学家吃饭问题
from threading import Thread, Lock
first_lock  = Lock()
second_lock = Lock()
def eat1(name):
    first_lock.acquire()
    print('%s拿到第一个筷子啦'%name)
    second_lock .acquire()
    print('%s拿到第二个筷子了'%name)
    print('%s吃面'%name)
    second_lock .release()
    first_lock.release()

def eat2(name):
    second_lock.acquire()
    print('%s拿到第二个筷子啦'%name)
    time.sleep(1)
    first_lock.acquire()
    print('%s拿到第一个筷子了'%name)
    print('%s吃面'%name)
    first_lock.release()
    second_lock .release()

Thread(target=eat1,args=('a',)).start()
Thread(target=eat2,args=('b',)).start()
Thread(target=eat1,args=('c',)).start()
Thread(target=eat2,args=('d',)).start()

以上代码会出现死锁问题。为了解决这个,可以使用递归锁:

from threading import Thread, RLock
first_lock = second_lock = RLock()
def eat1(name):
    first_lock.acquire()
    print('%s拿到第一个筷子啦'%name)
    second_lock .acquire()
    print('%s拿到第二个筷子了'%name)
    print('%s吃面'%name)
    second_lock .release()
    first_lock.release()

def eat2(name):
    second_lock.acquire()
    print('%s拿到第二个筷子啦'%name)
    time.sleep(1)
    first_lock.acquire()
    print('%s拿到第一个筷子了'%name)
    print('%s吃面'%name)
    first_lock.release()
    second_lock .release()

Thread(target=eat1,args=('a',)).start()
Thread(target=eat2,args=('b',)).start()
Thread(target=eat1,args=('c',)).start()
Thread(target=eat2,args=('d',)).start()

这样就不会发生死锁问题了。因为 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require 。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。在同一个线程中使用两把以上的锁时会出现死锁问题,可以使用递归。

import time
from threading import Semaphore, Thread
def func(sem, a, b):
    sem.acquire()
    time.sleep(1)
    print(a+b)
    sem.release()

sem = Semaphore(4)
for i in range(10):
    t = Thread(target=func, args=(sem, i, 2*i))
    t.start()

同一时间只能有 n 个线程访问那段代码。

#  第一个线程 : 连接数据库
    # 等待一个信号 告诉我我们之间的网络是通的
    # 连接数据库
#  第二个线程 : 检测与数据库之间的网络是否连通
    # time.sleep(0,2) 2
    # 将事件的状态设置为True
import time
import random
from threading import Thread,Event

def connect_db(e):
    count = 0
    while count < 3:
        e.wait(0.5)   # 状态为False的时候,只等待 0.5s 就结束
        if e.is_set() == True:
            print('连接数据库')
            break
        else:
            count += 1
            print('第%s次连接失败'%count)
    else:
        raise TimeoutError('数据库连接超时')

def check_web(e):
    time.sleep(random.randint(0,3))
    e.set()

e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()
con = Condition()
conn.acquire()
conn.notify(n)
conn.release()
# 在线程中
conn.acquire()
conn.wait()
conn.release()
# 要进去的时候需要先获得钥匙,进去之后判断钥匙时候得到
from threading import Timer

def func():
    print('定时器')

Timer(2, func).start()
# 过两秒过后会打印 定时器
# Python 3 中的
import queue
# Python 2 中的
import Queue
q = queue.Queue()    # 先进先出
q.get()
q.put()
q.put_nowait()    # 没有阻塞
q.get_nowait()    # 没有阻塞

q = queue.LifoQueue()    # 栈 先进后出
q = queue.PriorityQueue() # 优先级队列
q.put((10, 'a'))
q.put((20, 'b'))    # 越小优先级越高,负数也可以
# 优先级一样,看第二项

线程池

concurrent.futures 这是 Python 3 提供的。里面提供了线程池和进程池,接口完全一样,只是本质上不同。

import time
from concurrent.futures import ThreadPoolExecutor

def func(n)
    time.sleep(2)
    print(n)          --> 打印的时候不是固定的,因为线程开启的事件不同
    return n*n

tpool = ThreadPoolExecutor(max_workers=n)    #默认不要超过 n = CPU 个数 * 5
tlst = []
for i in range(20):
    t =  tpool.submit(func, 1)      --> 异步的
    tlist.append(t)
# tpool .shutdown() --> 完成 close + join 的工作
print('over')
for i in tlist:
    print(t.result())      --> 获取返回值    一定按照顺序打印,因为 tlist 里面顺序是固定的

# 如果使用 map 
tpool.map(func, range(20))    # 拿不到返回值

另外它也有 回调函数的用法
tpool.submit(func, 1).add_done_callback(call_back)
线程池的缺点:
任务队列是无界的,所以需要控制好。如果队列的生产者任务生产的太快,而线程池消费太慢处理不过来,任务就会堆积。如果堆积一直持续下去,内存就会持续增长直到OOM,任务队列里堆积的所有任务全部彻底丢失。
另外如果要查看成功的任务:

from concurrent.futures import ThreadPoolExecutor, as_completed

executor = ThreadPoolExecutor(max_workers=2)
urls = []
all_task = [executor.submit(function, (url)) for url in urls]
for future in as_completed(all_task):
    data = future.result()
    print('success')
    # 顺序不固定,谁先完成就处理谁

# 还有一种方法,只不过这种方法是严格按照同步顺序来的
for data in executor.map(function, urls):
    print(data)
顺序严格按照 urls 的顺序来的

另外还有一个 wait

from concurrent.futures import wait
wait(all_task, return_when=)
这里 return_when 有第一个完成后,所有完成后等。
阻塞某一个 task 或者一些 task 完成
上一篇下一篇

猜你喜欢

热点阅读