python多线程与多进程

2019-05-21  本文已影响0人  Byte猫

一、基本概念

1、进程process

什么是进程。最直观的就是一个个pid,官方的说法就:进程是程序在计算机上的一次执行活动。
从内核的观点看,进程的目的就是担当分配系统资源(CPU时间、内存等)的基本单位。
进程有独立的地址空间,一个进程崩溃后不会对其它进程产生影响。

2、线程thead

线程是进程的一个执行流,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
一个进程由几个线程组成,线程与同属一个进程的其他的线程共享进程所拥有的全部资源。
线程没有独立的地址空间,一个线程死掉就等于整个进程死掉。

3、多线程和多进程

在 Python 中,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势。
当然对运行一个程序来说,随着 CPU 的增多执行效率肯定会有所提高,这是因为一个程序基本上不会是纯计算或者纯 I/O,所以我们只能相对的去看一个程序到底是计算密集型还是 I/O 密集型。

二、Python多进程技术

python中的多进程主要使用到 multiprocessing 这个库

1、不使用进程池

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time,os

def worker():
    print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
    time.sleep(2)
    print("子进程{}终止".format(os.getpid()))

if __name__ == "__main__":
    print("本机为",os.cpu_count(),"核 CPU")
    print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
    start = time.time()

    l=[]
    # 创建子进程实例
    for i in range(10):
        p=Process(target=worker,name="worker"+str(i),args=())
        l.append(p)

    # 开启进程
    for i in range(10):
        l[i].start()
 
    # 阻塞进程
    for i in range(10):
        l[i].join()
    
    stop = time.time()
    print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
    print("总耗时 %s 秒" % (stop - start))

2、使用进程池

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time,os

def worker(arg):
    print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
    time.sleep(2)
    print("子进程{}终止".format(os.getpid()))

if __name__ == "__main__":
    print("本机为",os.cpu_count(),"核 CPU")
    print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
    start = time.time()

    l = Pool(processes=5)
    # 创建子进程实例
    for i in range(10):
        # l.apply(worker,args=(i,))      # 同步执行(Python官方建议废弃)
        l.apply_async(worker,args=(i,))  # 异步执行

    # 关闭进程池,停止接受其它进程
    l.close()
    # 阻塞进程
    l.join()
    
    stop = time.time()
    print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
    print("总耗时 %s 秒" % (stop - start))

三、Python多线程技术

python中的多进程主要使用到 threading 这个库

1、不使用线程池

# -*- coding:utf-8 -*-
from threading import Thread
import time,os

def worker(arg):
    print("子线程执行中>>> 编号={}".format(arg))
    time.sleep(2)
    print("子线程终止>>> 编号={}".format(arg))

if __name__ == "__main__":
    print("本机为",os.cpu_count(),"核 CPU")  # 本机为4核
    
    l = []
    # 创建子线程实例
    for i in range(10):
        t = Thread(target=worker, name='one', args=(i,))
        t.start()
        l.append(t)
    for p in l:
        p.join()

关于线程互斥、同步等

2、使用线程池(multiprocessing库的线程池)

"from multiprocessing import Pool "这样导入的 Pool 表示的是进程池,"from multiprocessing.dummy import Pool"这样导入的 Pool表示的是线程池。

# -*- coding:utf-8 -*-
from multiprocessing.dummy import Pool as ThreadPool
import time,os

def worker(arg):
    print("子线程{}执行中".format(arg))
    time.sleep(2)
    print("子线程{}终止".format(arg))

if __name__ == "__main__":
    print("本机为",os.cpu_count(),"核 CPU")
    print("主线程执行中, 开始时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
    start = time.time()

    pool = ThreadPool(5)
    results = pool.map(worker, range(10))

    pool.close()
    pool.join()
    
    stop = time.time()
    print("主线程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
    print("总耗时 %s 秒" % (stop - start))

3、使用线程池(自定义线程池)

"""
思路
1,将任务放在队列
    1)创建队列:(初始化)
    2)设置大小,线程池的最大容量
    3)真实创建的线程 列表
    4)空闲的线程数量

2,着手开始处理任务
    1)创建线程
        2)空闲线程数量大于0,则不再创建线程
        3)创建线程池的数量 不能高于线程池的限制
        4)根据任务个数判断  创建线程的数量
    2)线程去队列中取任务
        1)取任务包(任务包是一个元祖)
        2)任务为空时,不再取(终止)
"""

import time
import threading
import queue

stopEvent = object()  # 停止任务的标志

class ThreadPool(object):
    def __init__(self, max_thread):
        # 创建任务队列,可以放无限个任务
        self.queue = queue.Queue()
        # 指定最大线程数
        self.max_thread = max_thread
        # 停止标志
        self.terminal = False
        # 创建真实线程数
        self.generate_list = []
        # 空闲线程数
        self.free_thread = []

    def run(self, action, args, callback=None):
        """
        线程池执行一个任务
        INPUT ->
            action:任务函数
            args:任务参数
            callback:执行完任务的回调函数,成功或者失败的返回值。
        """
        # 线程池运行的条件:1)
        if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:
            self.generate_thread()
        task = (action, args, callback)
        self.queue.put(task)

    def callback(self):
        """
        回调函数:循环取获取任务,并执行任务函数
        """
        # 获取当前线程
        current_thread = threading.current_thread()
        self.generate_list.append(current_thread)
        # 取任务并执行
        event = self.queue.get()
        # 事件类型是任务
        while event != stopEvent:  # 重点是这个判断  使任务终止
            # 解开任务包 ,(任务是一个元祖)
            # 执行任务
            # 标记:执行任务前的状态,执行任务后的状态
            action, args, callback = event
            try:
                ret = action(*args)
                success = True
            except Exception as x:
                success = False
                ret = x
            if callback is not None:
                try:
                    callback(success, ret)
                except Exception as e:
                    print(e)
            else:
                pass
            if not self.terminal:
                self.free_thread.append(current_thread)
                event = self.queue.get()
                self.free_thread.remove(current_thread)
            else:
                # 停止进行取任务
                event = stopEvent
        else:
            # 不是元祖,不是任务,则清空当前线程,不在去取任务
            self.generate_list.remove(current_thread)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.callback)
        t.start()

    # 终止取任务
    def terminals(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        num = len(self.generate_list)
        self.queue.empty()
        while num:
            self.queue.put(stopEvent)
            num -= 1


def test(pi):
    time.sleep(0.5)
    print(pi)


pool = ThreadPool(10)

for i in range(100):
    pool.run(action=test, args=(i,))

pool.terminals()
pool.close()
上一篇下一篇

猜你喜欢

热点阅读