一些收藏Python已看python

python并发编程:多线程

2021-11-13  本文已影响0人  倔犟的贝壳

并发编程之多线程

python中的并发编程,一个是协程,另一个就是多线程了。它们都用于IO操作频繁的场景。

基于Thread的多线程

python3提供了一个内置模块thread.Thread可以方便我们创建多线程。可以在函数中创建,也可以用类继承Thread.

Thread有几个重要的方法,start(),run(),join()

start()

开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。

如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError 。

run()

代表线程活动的方法。

你可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target 参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。

join(timeout)

等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。

参考文档、文章:

https://docs.python.org/zh-cn/3/library/threading.html

https://www.cnblogs.com/wongbingming/p/9028851.html

在函数中用Thread创建线程。

Thread有三个重要的参数,target和args、kwargs

target:函数名,即指向线程运行时所执行的函数。

args:target函数的参数,是一个元组。is the argument tuple for the target invocation. Defaults to ().

kwargs:target函数参数,是一个字典。is a dictionary of keyword arguments for the target>

import time
from threading import Thread

def recv_msg(msg,n):
    print("got a msg:{}\n".format(msg))
    #开始处理消息
    time.sleep(2)
    print("process msg end:{}".format(msg))
    
    
def main():
    thread_1 = Thread(target=recv_msg,args=('Hello',10000000))
    thread_1.start()
    
    thread_2 = Thread(target=recv_msg,args=('World',100000000))
    thread_2.start()
    
    thread_1.join()
    thread_2.join()
    
start = time.perf_counter()
main() 
end = time.perf_counter()
print("spend time:{}".format(end-start))
got a msg:Hello

got a msg:World

process msg end:Hello
process msg end:World
spend time:2.006348734999847

join会阻塞主线程,这有当join的子线程执行完了,才会往下执行,按理我们只需join最长的一个线程即可,但很多情况下,我们是不知道哪个线程会运行最长的。
全部join,最后运行的时间也是最长的线程所花费的时间。但是需要保证所有线程都已经start了。如果写成:
thread1.start()
thread1.join()
thread2.start()
thread2.join()
这样线程就没什么用了。

import time
from threading import Thread


global global_a
def thread_sleep(n):
    print("thread sleep {}s\n".format(n))
    time.sleep(n)
    global global_a
    global_a = n
    print("sleep {}s end\n".format(n))
    
    
def main():
    thread_1 = Thread(target=thread_sleep,args=(2,))
    thread_2 = Thread(target=thread_sleep,args=(5,))
    thread_3 = Thread(target=thread_sleep,args=(10,))
    
    thread_1.start()
    thread_2.start()
    thread_3.start()
    print("start join 1")
    thread_1.join()
    print("start join 2")
    thread_2.join()
    print("start join 3")
    thread_3.join()
    
    
start = time.perf_counter()
main() 
end = time.perf_counter()
print("spend time:{}".format(end-start))
print(global_a)

#输出
   thread sleep 2s
    
    thread sleep 5s
    
    thread sleep 10s
    start join 1
    
    sleep 2s end
    
    start join 2
    sleep 5s end
    
    start join 3
    sleep 10s end
    
    spend time:10.003887921000569
    10

自定义线程类,继承Thread,需要重写run()方法

import time
from threading import Thread
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
        
    def run(self):
        print("run {}\n".format(self.name))
        self.count = 0
        time.sleep(1)
        for i in range(1000000):
            self.count += i
        print("result is :{}\n".format(self.count))
        
def main():
    thread_1 = MyThread("hello")
    thread_2 = MyThread("world")
    
    thread_1.start()
    thread_2.start()
    thread_1.join()
    thread_2.join()
    
main()


#输出结果:
 run hello
    
    run world
    
    result is :499999500000
    
    result is :499999500000

基于Futures的多线程

参考文章、文档:
https://time.geekbang.org/column/article/102562

https://docs.python.org/3/library/concurrent.futures.html#future-objects

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html

Python 中的 Futures 模块,位于 concurrent.futures 和 asyncio 中,它们都表示带有延迟的操作。

Futures 会将处于等待状态的操作包裹起来放到队列中,这些操作的状态随时可以查询,当然,它们的结果或是异常,也能够在操作完成后被获取。
在协程中,我们使用到了ansyncio中的Future。这里我们的多线程,将会使用到concurrent.futures中的Future对象。

concurrent.futures 带来了线程池,使线程能够更加高效地利用。相比Thread,也更加方便使用

这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能

池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务 来源:https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html

组成部分:

concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。

submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。

map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。

shutdown(Wait=True): 发出让执行者释放所有资源的信号。

concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

Executor有两种子类,get_ipython自独立操作一个线程池或进程池,分别为:

concurrent.futures.ThreadPoolExecutor(max_workers) #操作线程池

concurrent.futures.ProcessPoolExecutor(max_workers)#操作进程池

import concurrent.futures 
#ThreadPoolExecutor创建一个线程池,max_workers表示最多有多少个worker并行执行任务。
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    #executor调用异步执行方法submit,执行任务,第一个参数是执行任务的函数,args和kwargs表示函数参数。返回一个Future对象
    future = executor.submit(pow, 2, 3)
    print(type(future))#输出:<class 'concurrent.futures._base.Future'>
    print(future.result()) #结果:8
<class 'concurrent.futures._base.Future'>
8
import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print("Read {} from {}".format(len(resp.content),url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
        executor.map(download_one,sites)
        
def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()
    
Read 191745 from https://en.wikipedia.org/wiki/Portal:Arts
Read 373799 from https://en.wikipedia.org/wiki/Portal:Biography
Read 319650 from https://en.wikipedia.org/wiki/Portal:History
Read 275172 from https://en.wikipedia.org/wiki/Portal:Society
Read 274280 from https://en.wikipedia.org/wiki/Portal:Technology
Read 323180 from https://en.wikipedia.org/wiki/Portal:Geography
Read 364444 from https://en.wikipedia.org/wiki/Computer_science
Read 554219 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 340435 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 560652 from https://en.wikipedia.org/wiki/PHP
Read 193173 from https://en.wikipedia.org/wiki/Node.js
Read 65944 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 364566 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 180470 from https://en.wikipedia.org/wiki/Portal:Science
Read 299680 from https://en.wikipedia.org/wiki/Portal:Mathematics
Download 15 sites in 1.7136735189997125 seconds

多线程对于CPU操作heavy的情况下,效果并不太明显,如,计算1-100000

import time
import concurrent.futures

def count(number):
    count = 0
    for i in range(number):
        count += i
   # print("count is:{}\n".format(count))
    return count
    


number_list = [50000,100000,200000,3000000]
#直接调用
start = time.perf_counter()

for num in number_list:
    print(count(num))
end = time.perf_counter()
print("直接调用花费时间:{}".format(end - start))



#多线程

start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future_to_num = {executor.submit(count,num):num for num in number_list}
  #  futures = [executor.submit(count,num) for num in number_list]
    #as_completed(fs)会yields 一个完成或取消的(总之Complete)的Future对象。
    for future in concurrent.futures.as_completed(future_to_num):
        num = future_to_num[future]
        print("reuslt of {} is :{}".format(num,future.result()))
        try:
            result = future.result() #获取结果
        except Exception as e:
            print("Exception:{}".format(e))
        
            
end = time.perf_counter()
print("多线程调用花费时间:{}".format(end - start))

1249975000
4999950000
19999900000
4499998500000
直接调用花费时间:0.23715457800062723
reuslt of 100000 is :4999950000
reuslt of 50000 is :1249975000
reuslt of 200000 is :19999900000
reuslt of 3000000 is :4499998500000
多线程调用花费时间:0.2344772459982778

这是因为,在Cpython下面,同一时刻,只能有一个线程执行。它的多线程是系统通过线程切换来实现的。这是因为CPython的GIL导致的。

对于IO密集型,因为很多时间是在等待。使用多线程,可以有效利用等待时间

而对于CPU密集型的话,多线程并不会并行计算,所以效果并不太明显,此时我们应该使用多进程。

GIL Global Interpreter Lock,即全局解释器锁。因为Cpython解释器的内存管理并不是线程安全的,即有多个线程的情况下,有可能同时去修改一个对象。于是Cpython使用简单的锁机制(大概即有一个全局锁,每次锁住一个线程,运行一定时间,然后释放锁,让其它线程获取锁,然后其它线程去执行),这样保证同一时间最多只有一个线程执行字节码。这样导致python没办法利用多核优势。

多线程 vs 协程

多线程与协程,都是并发编程。协程是由用户自己定义在什么时候暂停操作,让出控制权;
而多线程是我们只需submit好任务,剩下的交给操作系统。但是频繁的线程切换也是需要消耗资源和时间的。
什么时候用多线程,什么时候用协程,最终还是需要根据一定的场景决定。

The End!

上一篇下一篇

猜你喜欢

热点阅读