大数据,机器学习,人工智能

Python进阶 - 高性能计算之多线程

2020-02-22  本文已影响0人  ChaoesLuol

写在前面

这个系列是笔者学习python一些进阶功能的笔记和思考,水平有限,错漏在所难免,还请方家不吝指教。

什么是线程?

首先应该了解操作系统如何支持多任务运行的,这里有“并行”和“并发”两个概念。

在操作系统中运行的一个程序就是一个进程(Process),它是代码+资源的组合。而在一个进程中,有一个或者多个线程(Thread)。线程是进程的组成部分,一个进程至少有一个主线程来完成进程从开始到结束的全部操作。

单核CPU如何运行多个线程?

python中多线程的实现

threading模块

t1 = threading.Thread(target = someFunction, args = ()),此时会创建一个线程对象,但是不会直接创建线程。用args关键词可以为函数传入参数,但是注意这里传入的参数因为数量不定,因此一定要是一个元组

调用线程对象的start方法才会创建线程,并且让线程开始运行。

如下面的例子:

import threading


def my_func(cnt):
    for i in range(cnt):
        print(i)


if __name__ == '__main__':
    t1 = threading.Thread(target=my_func, args=(10,))  # 创建一个线程
    t1.start() # 启动子线程

主线程需要负责回收分配给子线程的资源,因此主线程一定会晚于子线程结束。

继承thread类

用继承了Thread等类也可以实现线程创建。但是这个类中一定需要定义run方法,这样在start启动线程后会自动调用run方法,例如以下程序:

from time import sleep
import threading

class myThread(threading.Thread):
    def run(self):
        for i in range(10):
            sleep(1)
            msg = "I'm " + self.name + " @ " + str(i)
            print(msg)

def main():
    testThread = myThread()
    testThread.start()

if __name__ == "__main__":
    main()

会得到以下结果:

I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
I'm Thread-1 @ 3
I'm Thread-1 @ 4
I'm Thread-1 @ 5
I'm Thread-1 @ 6
I'm Thread-1 @ 7
I'm Thread-1 @ 8
I'm Thread-1 @ 9

如果需要调用一系列的函数,那么可以将其封装在这个类中,然后在run方法里调用。

不同线程中全局变量的共享

在不同的线程中,全局变量是共享的,这使得不同线程之间协作处理数据非常方便。但是这种共享也会有负面影响 -- 造成资源竞争,例如如下代码:

from time import sleep
import threading

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

执行后会得到如下结果:

g_num in test1: 1221968
g_num in test2: 1323741
g_num in main thread: 1323741

这里得到的g_num并不是我们想象的2000000,因为在执行python代码时,我们使得数据自加的一句python代码会被翻译为好几句机器码:

在执行时,由于只使用了一个CPU,CPU会采用时间片轮转的方法来模拟多任务。因此实际上会在任意一步被cpu打断,例如在完成数据+1后,在存储数据时被打断,这样增加后的数据就没有存入内存,下一个线程从内存读取时,读到的就是没有自增前的数,这样可能使得两个线程中自增的数据相互覆盖,导致加到最后得到的值要比想象中的小。这种问题,也叫做“数据不同步”。

线程锁

当多个线程几乎同时修改一个数据时,需要进行同步控制。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入<u>互斥锁</u>。

互斥锁会给资源附加一个状态:锁定/非锁定。

当某个线程需要修改资源时,先将其锁定,此时其他线程不可以修改该资源;到该线程修改结束后,释放资源,使其变为非锁定,其他的线程才能再次锁定该资源,进行修改。这样互斥锁就保证了每次只有一个线程进行写入操作,保证了多线程下全局变量的正确性。

为了实现互斥锁,threading模块中提供了Lock和RLock两个类:

Lock和RLock两个类都提供了以下方法来锁定和释放:

对于上面的问题,我们可以用互斥锁来解决多个线程之间的资源竞争问题,如夏例:

from time import sleep
import threading

g_num = 0
mutex = threading.Lock() # 建立互斥锁

def test1(num):
    global g_num
    mutex.acquire(True) # 在更改数据之前获得互斥锁
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完数据之后释放锁定
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    mutex.acquire(True) # 在更改数据之前获得互斥锁
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完数据之后释放锁定
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

这样,我们在最后得到的就是我们预期的结果了:

g_num in test1: 1000000
g_num in test2: 2000000
g_num in main thread: 2000000

但是需要注意,用互斥锁会带来以下两个问题:

死锁

线程间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方的资源,就可能造成死锁。当出现死锁时,会造成应用停止响应,如下例:

import threading
from time import sleep


lockA = threading.Lock()
lockB = threading.Lock()

def testA():
    # 为进程A上锁
    lockA.acquire()
    print("----Lock A acquired in testA----")
    sleep(1)
    # 中间需要操作另一批数据,上锁B
    lockB.acquire()
    print("----Lock B acquired in testA----")
    sleep(2)
    # 数据操作完成,解开锁B
    lockB.release()
    print("----Lock B released in testA----")
    # 完成操作,释放锁A
    lockA.release()
    print("----Lock A released in testA----")


def testB():
    # 开始操作时为进程B上锁
    lockB.acquire()
    print("----Lock B acquired in testB----")
    sleep(1)
    # 进行下一步操作前需要获取锁A
    lockA.acquire()
    print("----Lock A acquired in testB----")
    sleep(2)
    # 进行完数据操作释放锁A
    lockA.release()
    print("----Lock A released in testB----")
    # 释放锁B
    lockB.release()
    print("----Lock B released in testB----")


def main():
    t1 = threading.Thread(target=testA)
    t2 = threading.Thread(target=testB)

    t1.start()
    t2.start()


if __name__ == "__main__":
    main()

在这个例子中,线程t1开始时,为lockA上锁,并进入睡眠(模拟一些耗时操作),而同时开始的线程t2为lockB上锁,等t1向下执行时,需要lockB,lockB处于上锁状态,因此线程t1堵塞,等待lockB被释放;而t2在执行时,acquire lockA失败,也进入了堵塞状态,等待lockA被释放。

这样,两个线程互相需要对方释放互斥锁,两个线程都无法向下执行,进入了死锁状态。

死锁的避免

ThreadLocal

除了使用互斥锁以外,还有一种办法可以实现各线程间的数据隔离,那就是使用threading.local()。它会为各个变量创建完全属于他们自己的副本(也就是线程局部变量),这样各个线程操作的就是属于自己的私有资源,可以杜绝数据不同步的问题。

import threading
from time import sleep

from time import sleep
import threading

g_num = 0
local = threading.local()


def test1(num):
    global g_num
    local.g_num = g_num  # 将g_num绑定一个线程局部变量
    for i in range(num):
        local.g_num += 1
    print("g_num in test1: %d" % local.g_num)


def test2(num):
    global g_num
    local.g_num = g_num  # 将g_num绑定一个线程局部变量
    for i in range(num):
        local.g_num += 1
    print("g_num in test2: %d" % local.g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    t1.join()  # 等待线程t1执行完毕
    t2.join()  # 等待线程t2执行完毕

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

结果为:

g_num in test1: 1000000
g_num in test2: 1000000
g_num in main thread: 0

可以看到,尽管我们将全局变量g_num绑定在线程局部变量中,但是每个线程操作的实际上是自己的线程局部变量,并不会作用于我们绑定上去的全局变量。

Python中多线程的问题

在Python中(当前使用版本3.7,在3.8中据说会用局部解释器绕开GIL的问题),多线程并不能真正有效利用多核,例如如下代码:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 单线程顺序执行
    thread_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        t.join()
    end_time = time.time()
    print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")

    # 双线程并行
    print("CPU num for current machine: ",multiprocessing.cpu_count())
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        thread_array[tid] = t
    for tid in thread_array.keys():
        thread_array[tid].join()
    end_time = time.time()
    print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")

if __name__ == '__main__':
    main()

运行的结果如下:

Total time for two sequential threads:  11.32  s
CPU num for current machine:  4
Total time for multi-threads:  11.69  s

可以看到,在四核的电脑上,两个线程用多线程并行和单线程串行,速度并没有任何提高。

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

解决方法

multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。

当然multiprocess也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。

在将上面用threading实现的多任务改为multiprocessing:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 单进程顺序执行
    process_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for pid in range(2):
        p = multiprocessing.Process(target=my_counter, args=(count_num,))
        p.start()
        p.join()
    end_time = time.time()
    print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")

    # 多进程并行
    print("CPU num for current machine: ", multiprocessing.cpu_count())
    process_array = {}
    start_time = time.time()
    for tid in range(2):
        t = multiprocessing.Process(target=my_counter, args=(count_num,))
        t.start()
        process_array[tid] = t
    for tid in process_array.keys():
        process_array[tid].join()
    end_time = time.time()
    print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")


if __name__ == '__main__':
    main()

结果如下,可以看到多进程相比单进程,速度有了明显提升:

Total time for two sequential processes:  11.3  s
CPU num for current machine:  4
Total time for multi-processings:  7.91  s

之前也提到了既然GIL只是CPython的产物,那么其他解析器是不是更好呢?没错,像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。毕竟功能和性能大家在初期都会选择前者,Done is better than perfect。

上一篇 下一篇

猜你喜欢

热点阅读