Python 经验 - 多线程与多进程

2018-08-22  本文已影响11人  千反田爱瑠爱好者

多线程

GIL

GIL(Global Interpreter Lock)即全局解释器锁。

import dis
def add(a):
    a = a+1
    return a
print(dis.dis(add))
import threading

total = 0

def add():
    global total
    for i in range(1000000):
        total += 1

def desc():
    global total
    for i in range(1000000):
        total -= 1    # 执行的过程中会释放锁,让给另一个线程

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

thread1.join()
thread2.join()

# 每次执行最终结果都不确定,即加和减的次数不定
print(total)    

多线程编程

# 模拟多线程爬虫(并发爬取列表页和详情页)

import time
import threading

# 爬取详情页
def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")

# 从列表页爬取详情页url
def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")


class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")

class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")

if  __name__ == "__main__":
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread1.start()
    thread2.start()

    thread1.join()    # 等待完成后再继续执行下面的
    thread2.join()

    # 当主线程退出的时候,子线程才会杀死
    print ("last time: {}".format(time.time() - start_time))

线程间通信

共享变量 + 锁

import time
import threading

from threading import Condition

# 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url
# 当url_list为空的时候,消费者就暂停

detail_url_list = []        # list非线程安全,需要加锁
# global引用过多时可以创建一个模块专门存放共享变量
# from chapter11 import variables
# 不可以from chapter11.variables import detail_url_list

def get_detail_html(lock):
    # 爬取文章详情页

    global detail_url_list

    while True:
        if len(detail_url_list):
            lock.acquire()
            if len(detail_url_list):
                url = detail_url_list.pop()
                lock.release()
                print("get detail html started")
                time.sleep(2)
                print("get detail html end")
            else:
                lock.release()
                time.sleep(1)

def get_detail_url(lock):
    global detail_url_list

    # 爬取文章列表页(列表页爬速度比详情页快,可以开启多个线程爬去详情页)
    while True:
        print("get detail url started")
        time.sleep(4)
        for i in range(20):
            lock.acquire()
            if len(detail_url_list) >= 10:
                lock.release()
                time.sleep(1)
            else:
                detail_url_list.append("http://projectsedu.com/{id}".format(id=i))
                lock.release()
        print("get detail url end")

if  __name__ == "__main__":
    lock = RLock()
    thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,))
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html, args=(lock,))
        html_thread.start()

    #当主线程退出的时候, 子线程kill掉
    print ("last time: {}".format(time.time() - start_time))

队列

import time
import threading
from queue import Queue

def get_detail_html(queue):
    # 爬取文章详情页
    while True:
        url = queue.get()    # Queue默认阻塞
        # for url in detail_url_list:
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")

def get_detail_url(queue):
    # 爬取文章列表页
    while True:
        print("get detail url started")
        time.sleep(4)
        for i in range(20):
            queue.put("http://projectsedu.com/{id}".format(id=i))
        print("get detail url end")

if  __name__ == "__main__":
    detail_url_queue = Queue(maxsize=1000)

    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
        html_thread.start()
    start_time = time.time()
    detail_url_queue.task_done()    # 主动使Queue退出
    detail_url_queue.join()

    # 当主线程退出的时候, 子线程kill掉
    print ("last time: {}".format(time.time() - start_time))

锁:线程间同步

from threading import Lock, RLock, Condition
import threading

total = 0
lock = RLock()      # 重入锁(在同一线程中可多次acquire)
# lock = Lock()       # 一般锁,多次申请会造成死锁

def add():
    global lock
    global total
    for i in range(1000000):
        lock.acquire()    # 申请锁(失败则等待)
        lock.acquire()
        total += 1
        lock.release()
        lock.release()

def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
"""
死锁:
    互斥
    不可抢占
    请求且占有
    循环等待

A(a, b)
acquire (a)
acquire (b)

B(a, b)
acquire (b)
acquire (a)
"""

条件变量

import threading
from concurrent import futures

class XiaoAi(threading.Thread):

    def __init__(self, cond):
        super().__init__(name="小爱")
        self.cond = cond

    def run(self):
        with self.cond:
            self.cond.wait()
            print("{} : 在 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 好啊 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 君住长江尾 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 共饮长江水 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 此恨何时已 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 定不负相思意 ".format(self.name))
            self.cond.notify()

class TianMao(threading.Thread):

    def __init__(self, cond):
        super().__init__(name="天猫精灵")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小爱同学 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我们来对古诗吧 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我住长江头 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 日日思君不见君 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 此水几时休 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 只愿君心似我心 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

if __name__ == "__main__":
    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)

    xiaoai.start()
    tianmao.start()

信号量

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire()      # 启动线程前申请,在线程内部释放
            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
            html_thread.start()

if __name__ == "__main__":
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

线程池

from concurrent.futures import ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
# 通过submit函数提交执行的函数到线程池中, 立即返回
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
task1.done()            # 获取task1执行状态
task1.result()          # 获取task1执行结果
task2.cancel()          # 取消task2执行

批量提交线程,获取成功执行的线程

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED

executor = ThreadPoolExecutor(max_workers=2)

# 使用as_completed生成器,每有一个线程完成即yield
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)     # 等待首个子线程执行完成,主线程再继续执行
print('main')

for future in as_completed(all_task):
    data = future.result()
    print("get {} page".format(data))
    

# 通过executor的map获取已经完成的task的值(将每个url传入函数中一一执行)
for data in executor.map(get_html, urls):
    print("get {} page".format(data))

多进程

CPU操作:

from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor

def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

# 使用多线程
with ThreadPoolExecutor(3) as executor:
    all_task = [executor.submit(fib, (num)) for num in range(25, 40)]
    start_time = time.time()
    for future in as_completed(all_task):
        data = future.result()
        print("exe result: {}".format(data))

    print("last time is: {}".format(time.time()-start_time))

# 使用多进程
with ProcessPoolExecutor(3) as executor:
    all_task = [executor.submit(fib, (num)) for num in range(25, 40)]
    start_time = time.time()
    for future in as_completed(all_task):
        data = future.result()
        print("exe result: {}".format(data))

    print("last time is: {}".format(time.time()-start_time))

IO操作:

def random_sleep(n):
    time.sleep(n)
    return n

# 使用多线程
with ThreadPoolExecutor(3) as executor:
    all_task = [executor.submit(random_sleep, (num)) for num in [2] * 30]
    start_time = time.time()
    for future in as_completed(all_task):
        data = future.result()
        print("exe result: {}".format(data))
    print("last time is: {}".format(time.time() - start_time))

# 使用多进程
with ProcessPoolExecutor(3) as executor:
    all_task = [executor.submit(fib, (num)) for num in range(25, 40)]
    start_time = time.time()
    for future in as_completed(all_task):
        data = future.result()
        print("exe result: {}".format(data))
    print("last time is: {}".format(time.time()-start_time))

多进程编程

import os
# fork只能用于unix/linux中
pid = os.fork()     
print("ywh")        # 从这句开始,主进程、子进程都会执行
if pid == 0:
    print('子进程 {} ,父进程是: {}.' .format(os.getpid(), os.getppid()))
else:
    print('我是父进程:{}.'.format(pid))

使用multiprocessing和concurrent.futures包

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n

# 方法1
progress = multiprocessing.Process(target=get_html, args=(2,))
print(progress.pid)
progress.start()
print(progress.pid)
progress.join()
print("main progress end")

# 方法2
pool = multiprocessing.Pool(multiprocessing.cpu_count())    # 默认为系统CPU数
result = pool.apply_async(get_html, args=(3,))      # 异步提交
pool.close()            # 必须关闭,不再接收新的任务
pool.join()             # 等待任务完成
print(result.get())     # 获取返回结果

# 方法3
for result in pool.imap(get_html, [1,5,3]):
    print("{} sleep success".format(result))
    
# 方法4
for result in pool.imap_unordered(get_html, [1, 5, 3]):
    print("{} sleep success".format(result))

进程间通信

多线程通信

import time
from multiprocessing import Process, Queue, Pool

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

queue = Queue(10)
my_producer = Process(target=producer, args=(queue,))
my_consumer = Process(target=consumer, args=(queue,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()

进程池

from multiprocessing import Process, Manager

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

queue = Manager().Queue(10)
pool = Pool(2)

pool.apply_async(producer, args=(queue,))
pool.apply_async(consumer, args=(queue,))

pool.close()
pool.join()

from queue import Queue                 # 多线程
from multiprocessing import Queue       # 多进程
from multiprocessing import Manager     # 进程池

管道

from multiprocessing import Process, Pipe

def producer(pipe):
    pipe.send("bobby")

def consumer(pipe):
    print(pipe.recv())

if __name__ == "__main__":
    recevie_pipe, send_pipe = Pipe()
    my_producer = Process(target=producer, args=(send_pipe,))
    my_consumer = Process(target=consumer, args=(recevie_pipe,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
上一篇下一篇

猜你喜欢

热点阅读