线程 进程 协程

Python多线程实现生产者消费者

2016-09-08  本文已影响0人  tyrone_li

1. Python多线程介绍


Python提供了两个有关多线程的标准库,threadthreadingthread提供了低级别的,原始的线程和一个锁。threading则是一个高级模块,提供了对thread的封装。一般情况下,使用threading是比较好的做法。

使用threading实现线程,只需要从threading.Thread类继承,并重写其中的__init__()方法和run()方法。

from threading import Thread


class MyThread(Thread):
    def __init__(self):
        Thread.__init__(self)

    def run(self):
        print(self.thread_id, "start")

threading提供了一个锁:lock = threading.Lock(),调用锁的acquire()release()方法可以使线程获得和释放锁。

需要注意的是,Python有一个GIL(Global Interpreter Lock)机制,任何线程在运行之前必须获取这个全局锁才能执行,每当执行完100条字节码,全局锁才会释放,切换到其他线程执行。

所以Python中的多线程不能利用多核计算机的优势,无论有多少个核,同一时间只有一个线程能得到全局锁,只有一个线程能够运行。

那么Python中的多线程有什么作用呢?为什么不直接使用Python中的多进程标准库?这里要根据程序执行的是IO密集型任务和计算密集型任务来选择。

当执行IO密集型任务时,比如Python爬虫,大部分时间都用在了等待返回的socket数据上,CPU此时是完全闲置的,这种情况下采用多线程较好。

当执行计算密集型任务时,比如图像处理,大部分时间CPU都在计算,这种情况下使用多进程才能真正的加速,使用多线程不仅得不到并行加速的效果,反而因为频繁切换上下文拖慢了速度。

2. threading实现生产者消费者


# -*- coding: utf-8 -*-
from threading import Thread
import time

queue = []


class Producer(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        while 1:
            queue.append(1)
            print("Producer: %s create a product" % self.name)
            print("Producer: %s put a product into queue" % self.name)
            time.sleep(0)
            if len(queue) > 20:
                print("queue is full!")
                time.sleep(1)


class Consumer(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        while 1:
            try:
                queue.pop()
                print("Consumer: %s get a product" % self.name)
                time.sleep(2)
            except:
                print("Queue is empty!")
                time.sleep(2)
                print("Consumer: %s sleep 2 seconds" % self.name)


def test():
    p1 = Producer("Producer-1")
    c1 = Consumer("Consumer-1")
    c2 = Consumer("consumer-2")

    p1.start()
    c1.start()
    c2.start()


if __name__ == "__main__":
    test()

输出如下:

Producer: Producer-1 create a product
Producer: Producer-1 put a product into queue
queue is full!
Producer: Producer-1 create a product
Producer: Producer-1 put a product into queue
queue is full!

输出显示满了之后仍然显示了生产者在创建产品,表明线程run()方法中的运行次序被打乱了。这是因为没有加锁,导致消费者线程运行到一半的时候,生产者线程获得了CPU。

Python提供了queue这一线程安全的容器,可以方便的和多线程结合起来。 queue包括FIFO先入先出队列Queue,LIFO后入先出队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

queue_tmp = queue.Queue(10)


class Producer(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        while 1:
            queue_tmp.put(0)
            print("Producer: %s create a product" % self.name)
            print("Producer: %s put a product into queue" % self.name)

class Consumer(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        while 1:
            queue_tmp.get()
            print("Consumer: %s get a product" % self.name)

3. join()函数用法测试


join()函数的原型是join(timeout=None),它的作用是阻塞进程一直到线程退出或者到timeout的时间结束。

这样一说是比较抽象的,下面用例子说明。

# -*- coding: utf-8 -*-
import threading
from threading import Thread
import time

lock = threading.Lock()


class MyThread(Thread):
    def __init__(self, thread_id, thread_name, thread_counter):
        Thread.__init__(self)
        self.thread_id = thread_id
        self.thread_name = thread_name
        self.thread_counter = thread_counter

    def run(self):
        print(self.thread_id, "start")
        self.print_time(self.thread_name, self.thread_counter, 2)
        print(self.thread_id, "end")

    @staticmethod
    def print_time(thread_name, thread_counter, delay):
        for i in range(thread_counter):
            time.sleep(delay)
            print("%s: %s" % (thread_name, time.ctime(time.time())))


def test():
    t1 = MyThread(1, "Thread1", 5)
    t2 = MyThread(2, "Thread2", 5)
    t3 = MyThread(3, "Thread3", 5)

    t1.start()
    t2.start()
    t3.start()


if __name__ == "__main__":
    test()

程序中的三个线程均未调用join()方法,输出如下:

1 start
2 start
3 start
Thread2: Thu Sep  8 20:53:06 2016
Thread1: Thu Sep  8 20:53:06 2016
Thread3: Thu Sep  8 20:53:06 2016
Thread1: Thu Sep  8 20:53:08 2016
Thread2: Thu Sep  8 20:53:08 2016
Thread3: Thu Sep  8 20:53:08 2016
...

可以看到,三个线程开始后交替执行,下面给t2线程加入join()方法:

def test():
    t1 = MyThread(1, "Thread1", 5)
    t2 = MyThread(2, "Thread2", 5)
    t3 = MyThread(3, "Thread3", 5)

    t1.start()
    t2.start()
    t2.join()
    t3.start()

输出变成了下面这样:

1 start
2 start
Thread1: Thu Sep  8 20:54:58 2016
Thread2: Thu Sep  8 20:54:58 2016
Thread1: Thu Sep  8 20:55:00 2016
...
2 end
Thread1: Thu Sep  8 20:55:39 2016
1 end
3 start

t1和t2交替执行,直到t2结束之后,才会不再阻塞进程,继续执行t3.start()。

所以,join()函数是可以执行线程之间同步的。不过它最常用的是在启动了一批线程之后,逐个调用每个线程的join()方法,阻塞当前进程,直到每个线程都退出。

上一篇 下一篇

猜你喜欢

热点阅读