19--Python 并发与多线程

2019-08-09  本文已影响0人  Roger田翔
@Author : Roger TX (425144880@qq.com)
@Link : https://github.com/paotong999

一、线程和进程

1.1 什么是线程

1.2 进程的三个特征

1.3 什么是线程

在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。

归纳来说:

  • 操作系统可以同时执行多个任务,每一个任务就是一个进程,进程可以同时执行多个任务,每一个任务就是一个线程。
  • 对于一个 CPU 而言,在某个时间点它只能执行一个程序。也就是说单 CPU 只能运行一个进程,CPU 只是不断地在这些进程之间轮换执行。

二、Python 进程

2.1 创建Python进程(UNIX)

Python 的 os 模块提供了一个 fork() 方法,该方法可以 fork 出来一个子进程。

os.fork() 方法在 Windows 系统上无效,只在 UNIX 系统上有效。

fork() 方法不需要参数,它有一个返回值,该返回值表明是哪个进程在执行:

fork() 方法会启动两个进程(一个是父进程,一个是 fork 出来的子进程)来执行从 os.fork() 开始的所有代码。

import os

print('父进程(%s)开始执行' % os.getpid())
# 开始fork一个子进程
# 从这行代码开始,下面代码都会被两个进程执行
pid = os.fork()
print('进程进入:%s' % os.getpid())
# 如果pid为0,表明子进程
if pid == 0:
    print('子进程,其ID为 (%s), 父进程ID为 (%s)' % (os.getpid(), os.getppid()))
else:
    print('我 (%s) 创建的子进程ID为 (%s).' % (os.getpid(), pid))
print('进程结束:%s' % os.getpid())

结果如下:

父进程(1795)开始执行
进程进入:1795
我(1795)创建的子进程ID 为(1796).
进程结束:1795
进程进入:1796
子进程,其ID 为(1796),父进程ID 为(1795)
进程结束: 1796

2.2 创建Python进程(WINDOWS)

Python 在 multiprocessing 模块下提供了 Process 来创建新进程。

使用 Process 创建新进程也有两种方式:

Process 类也有如下类似的方法和属性:

指定函数作为target创建新进程

import multiprocessing
import os

# 定义一个普通的action函数,该函数准备作为进程执行体
def action(max):
    for i in range(max):
        print("(%s)子进程(父进程:(%s)):%d" %
            (os.getpid(), os.getppid(), i))
if __name__ == '__main__':
    # 下面是主程序(也就是主进程)
    for i in range(100):
        print("(%s)主进程: %d" % (os.getpid(), i))
        if i == 20:
            # 创建并启动第一个进程
            mp1 = multiprocessing.Process(target=action,args=(100,))
            mp1.start()
            # 创建并启动第一个进程
            mp2 = multiprocessing.Process(target=action,args=(100,))
            mp2.start()
            mp2.join()
    print('主进程执行完成!')

继承Process类创建子进程

import multiprocessing
import os
class MyProcess(multiprocessing.Process):
    def __init__(self, max):
        self.max = max
        super().__init__()
    # 重写run()方法作为进程执行体
    def run(self):
        for i in range(self.max):
            print("(%s)子进程(父进程:(%s)):%d" %
                (os.getpid(), os.getppid(), i))
if __name__ == '__main__':
    # 下面是主程序(也就是主进程)
    for i in range(100):
        print("(%s)主进程: %d" % (os.getpid(), i))
        if i == 20:
            # 创建并启动第一个进程
            mp1 = MyProcess(100)
            mp1.start()
            # 创建并启动第一个进程
            mp2 = MyProcess(100)
            mp2.start()
            mp2.join()
    print('主进程执行完成!')

2.3 Python使用进程池管理进程

如果程序需要启动多个进程,可以使用进程池来管理进程。程序可以通过 multiprocessing 模块的 Pool() 函数创建进程池。
进程池具有如下常用方法:

  1. 如果程序只是想将任务提交给进程池执行,则可调用 apply() 或 apply_async() 方法
  2. 如果程序需要使用指定函数将 iterable 转换成其他 iterable,则可使用 map() 或 imap() 方法

使用 apply_async() 方法启动进程:

import multiprocessing
import time
import os

def action(name='default'):
    print('(%s)进程正在执行,参数为: %s' % (os.getpid(), name))
    time.sleep(3)
if __name__ == '__main__':
    # 创建包含4条进程的进程池
    pool = multiprocessing.Pool(processes=4)
    # 将action分3次提交给进程池
    pool.apply_async(action)
    pool.apply_async(action, args=('位置参数', ))
    pool.apply_async(action, kwds={'name': '关键字参数'})
    pool.close()
    pool.join()

使用 map() 方法来启动进程:

import multiprocessing
import time
import os

def action(name='default'):
    print('(%s)进程正在执行,参数为: %s' % (os.getpid(), name))
    time.sleep(3)
if __name__ == '__main__':
    # 创建包含4条进程的进程池
    pool = multiprocessing.Pool(processes=4)
    # 将action分3次提交给进程池
    pool.apply_async(action)
    pool.apply_async(action, args=('位置参数', ))
    pool.apply_async(action, kwds={'name': '关键字参数'})
    pool.close()
    pool.join()

2.4 Python进程间通信

Python 为进程通信提供了两种机制:

使用Queue实现进程间通信

import multiprocessing
def f(q):
    print('(%s) 进程开始放入数据...' % multiprocessing.current_process().pid)
    q.put('Python')
if __name__ == '__main__':
    # 创建进程通信的Queue
    q = multiprocessing.Queue()
    # 创建子进程
    p = multiprocessing.Process(target=f, args=(q,))
    # 启动子进程
    p.start()
    print('(%s) 进程开始取出数据...' % multiprocessing.current_process().pid)
    # 取出数据
    print(q.get())  # Python
    p.join()

使用Pipe实现进程间通信

import multiprocessing

def f(conn):
    print('(%s) 进程开始发送数据...' % multiprocessing.current_process().pid)
    # 使用conn发送数据
    conn.send('Python')
if __name__ == '__main__':
    # 创建Pipe,该函数返回两个PipeConnection对象
    parent_conn, child_conn = multiprocessing.Pipe()
    # 创建子进程
    p = multiprocessing.Process(target=f, args=(child_conn, ))
    # 启动子进程
    p.start()
    print('(%s) 进程开始接收数据...' % multiprocessing.current_process().pid)
    # 通过conn读取数据
    print(parent_conn.recv())  # Python
    p.join()

三、Python 线程

3.1 创建Python线程

Python 提供了 _thread 和 threading 两个模块来支持多线程,其中 _thread 提供低级别的、原始的线程支持,以及一个简单的锁,而 threading 模块则提供了功能丰富的多线程支持。

Python 主要通过两种方式来创建线程:

3.2 调用 Thread 类的构造器创建线程

调用 Thread 类的构造器创建线程,直接调用 threading.Thread 类的构造器创建线程:

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

上面的构造器涉及如下几个参数:

import threading

# 定义一个普通的action函数,该函数准备作为线程执行体
def action(max):
    for i in range(max):
        # 调用threading模块current_thread()函数获取当前线程
        # 线程对象的getName()方法获取当前线程的名字
        print(threading.current_thread().getName() +  " " + str(i))
# 下面是主程序(也就是主线程的执行体)
for i in range(100):
    # 调用threading模块current_thread()函数获取当前线程
    print(threading.current_thread().getName() +  " " + str(i))
    if i == 20:
        # 创建并启动第一个线程
        t1 =threading.Thread(target=action,args=(100,))
        t1.start()
        # 创建并启动第二个线程
        t2 =threading.Thread(target=action,args=(100,))
        t2.start()
print('主线程执行完成!')

从运行结果中可以发现:

程序中共包含三个线程,这三个线程的执行没有先后顺序,它们以并发方式执行,Thread-1 执行一段时间,然后可能 Thread-2 或 MainThread 获得 CPU 执行一段时间,接下来又换其他线程执行,这就是典型的线程并发执行,CPU 以快速轮换的方式在多个线程之间切换

上面程序还用到了如下函数和方法:

threading.current_thread():它是 threading 模块的函数,该函数总是返回当前正在执行的线程对象。
getName():它是 Thread 类的实例方法,该方法返回调用它的线程名字。
setName():它是 Thread 类的实例方法,该方法为线程设置名字。

3.3 继承 Thread 类创建线程类

通过继承 Thread 类来创建并启动线程的步骤如下:

import threading

# 通过继承threading.Thread类来创建线程类
class FkThread(threading.Thread):
    def __init__(self): 
        threading.Thread.__init__(self)
        self.i = 0
    # 重写run()方法作为线程执行体
    def run(self): 
        while self.i < 100:
            # 调用threading模块current_thread()函数获取当前线程
            # 线程对象的getName()方法获取当前线程的名字
            print(threading.current_thread().getName() +  " " + str(self.i))
            self.i += 1
# 下面是主程序(也就是主线程的执行体)
for i in range(100):
    # 调用threading模块current_thread()函数获取当前线程
    print(threading.current_thread().getName() +  " " + str(i))
    if i == 20:
        # 创建并启动第一个线程
        ft1 = FkThread()
        ft1.start()
        # 创建并启动第二个线程
        ft2 = FkThread()
        ft2.start()
print('主线程执行完成!')

3.4 线程锁

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

import time, threading

# 假定这是你的银行存款:
balance = 0

def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

t1和t2是交替运行的,当操作系统以下面的顺序执行t1、t2:

初始值 balance = 0

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2   # balance = -8

结果 balance = -8

要确保balance计算正确,就要给change_it()上线程锁

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock = threading.Lock()
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()

如果使用线程锁


四、多任务运行

4.1 Python多线程的局限

Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

4.2 计算密集型与IO密集型

我们可以把任务分为计算密集型和IO密集型。

第一种任务的类型是计算密集型,特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

4.3 异步IO

考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。

现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。

对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。

上一篇 下一篇

猜你喜欢

热点阅读