工作生活

Python多线程

2019-07-06  本文已影响0人  Recalcitrant

目录:
一、线程的创建
二、多线程互斥锁
三、线程间通信
四、线程池

Python并发之多线程

一、线程的创建

单线程示例:

import time
def run(n):
     print("task ", n)
     time.sleep(2)


t0 = time.time()
run("t1")
run("t2")
ts = time.time()
print(ts - t0)
运行结果

1.启动多个线程(函数方式)

在Python3中,Python提供了一个内置模块 threading.Thread,可以很方便地让我们创建多线程。
threading.Thread() 一般接收两个参数:

import threading
import time
def run(n):
    print("task", n)
    time.sleep(2)
    print("{} finished!".format(n))
    ts = time.time()
    print("线程{}耗时:{}".format(n, ts-t0))


t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t0 = time.time()
t1.start()
t2.start()
print("main finished!")
运行结果

t1与t2几乎同时完成。

2.启动多个线程(类方式)

相比较函数而言,使用类创建线程,会比较麻烦一点。 首先,我们要自定义一个类,对于这个类有两点要求:

这里的 run 方法,和我们上面线程函数的性质是一样的,可以写我们的业务逻辑程序。在 start() 后将会被自动调用。

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, n, sleep_time):
        super(MyThread, self).__init__()
        self.n = n
        self.sleep_time = sleep_time

    def run(self):
        print("running task {}".format(self.n))
        time.sleep(self.sleep_time)
        print("task {} done!".format(self.n))
        ts = time.time()
        print("线程{}耗时:{}".format(self.n, ts - t0))


t1 = MyThread("t1", 2)
t2 = MyThread("t2", 4)
t0 = time.time()
t1.start()
t2.start()
ts = time.time()
print("main finished!")
运行结果

.join():程序会等待该线程结束后,再执行后面的语句。

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, n, sleep_time):
        super(MyThread, self).__init__()
        self.n = n
        self.sleep_time = sleep_time

    def run(self):
        print("running task {}".format(self.n))
        time.sleep(self.sleep_time)
        print("task {} done!".format(self.n))
        ts = time.time()
        print("线程{}耗时:{}".format(self.n, ts - t0))


t1 = MyThread("t1", 2)
t2 = MyThread("t2", 4)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
ts = time.time()
print("main finished!")
运行结果

二、多线程互斥锁

1.定义和使用锁

import threading
# 生成锁对象(全局唯一)
lock = threading.Lock()

# 获取锁(未获取到会阻塞程序,直到获取到锁才会往下执行)
lock.acquire()

# 释放锁
lock.release()

注意:lock.acquire() 和 lock.release()必须成对出现。否则就有可能造成死锁。
可以使用使用上下文管理器来加锁:

import threading
lock = threading.Lock()
with lock:
    操作代码
import threading
import time
g_num = 0


def work1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in work1, g_num is %d----" % g_num)


def work2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in work2, g_num is %d----" % g_num)


print("---线程创建之前g_num is %d---" % g_num)
t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()
t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()

# 等待线程执行完毕
while len(threading.enumerate()) != 1:
    time.sleep(1)

print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
运行结果

如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确。

import threading
import time
g_num = 0
lock = threading.Lock()


def work1(num):
    global g_num
    with lock:
        for i in range(num):
            g_num += 1
    print("----in work1, g_num is %d----" % g_num)


def work2(num):
    global g_num
    with lock:
        for i in range(num):
            g_num += 1
    print("----in work2, g_num is %d----" % g_num)


print("---线程创建之前g_num is %d---" % g_num)
t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()
t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()

# 等待线程执行完毕
while len(threading.enumerate()) != 1:
    time.sleep(1)

print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
运行结果

2.死锁

示例:

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        # 对mutexA上锁
        mutexA.acquire()

        # mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
        print(self.name+'----do1---up----')
        time.sleep(1)

        # 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
        mutexB.acquire()
        print(self.name+'----do1---down----')
        mutexB.release()

        # 对mutexA解锁
        mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        # 对mutexB上锁
        mutexB.acquire()

        # mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
        print(self.name+'----do2---up----')
        time.sleep(1)

        # 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
        mutexA.acquire()
        print(self.name+'----do2---down----')
        mutexA.release()

        # 对mutexB解锁
        mutexB.release()


mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
运行结果

3.全局锁(GIL)

多线程和多进程是不一样的:
多进程是真正的并行,而多线程是伪并行,实际上只是线程交替执行。

GIL(Global Interpreter Lock,全局解释器锁)

任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。而Python解释器,并不是只有CPython,除它之外,还有PyPy,Psyco,JPython,IronPython等。在绝大多数情况下,我们通常都认为 Python == CPython,所以也就默许了Python具有GIL锁这个事。

如何避免性能受到GIL的影响:

通常情况下:

  • I/O密集型:适用多线程
  • CPU密集型:适用多进程

三、线程间通信

1.Queue队列

import queue
# maxsize默认为0,不受限
# 一旦>0,而消息数又达到限制,q.put()也将阻塞
q = queue.Queue(maxsize=0)

# 阻塞程序,等待获取队列消息
q.get()
# 获取消息,设置超时时间
q.get(timeout=5.0)

# 发送消息
q.put()

# 等待所有的消息都被消费完
q.join()

# 以下三个方法代码中不要使用(由于具有瞬时性,所以没有参考价值):
# 查询当前队列的消息个数
q.qsize()

# 队列消息是否都被消费完,True/False
q.empty()

# 检测队列里消息是否已满
q.full()
import queue
q = queue.Queue(maxsize=0)


def producer():     # 生产者
    for i in range(1000):
        q.put(i)


def consumer():     # 消费者
    for i in range(1000):
        data = q.get()
        print(data)


t1 = threading.Thread(target=producer,)
t2 = threading.Thread(target=consumer,)
t1.start()
t2.start()

四、线程池

在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,放到队列中,让过来的任务立刻能够使用,就形成了线程池。
在Python3中,创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类来实现的。
future对象:在未来的某一时刻完成操作的对象。submit方法可以返回一个future对象。
示例:

import concurrent.futures as futures


# 线程执行的函数
def add(n1, n2):
    v = n1 + n2
    print('add:', v, ', tid:', threading.currentThread().ident)
    time.sleep(n1)
    return v


# 创建一个线程池
ex = futures.ThreadPoolExecutor(max_workers=3)      # 指定最多运行3个线程
# 通过submit把需要执行的函数扔进线程池中
f1 = ex.submit(add, 2, 3)       # submit返回一个future对象
f2 = ex.submit(add, 2, 2)       # submit返回一个future对象
print('main thread running!')
print(f1.done())        # .done():看看任务结束了没
print(f1.result())      # 获取结果(阻塞方法)
print(f1.done())        # .done():看看任务结束了没
print(f2.done())        # .done():看看任务结束了没
print(f2.result())      # 获取结果(阻塞方法)
print(f2.done())        # .done():看看任务结束了没
运行结果

获取线程执行结果方法:

上一篇下一篇

猜你喜欢

热点阅读