【python】Parallel

2019-12-11  本文已影响0人  乞力马扎罗的雪人

Parallel

[TOC]

并行化程序和多线程。一般用thread或者threading,multiprocessing,进程通信使用Pipe或者Queue(更安全),MPI 也可以。

Python多线程支持用两种方式来创建线程:一种是通过继承Thread类,重写它的run()方法(注意,不是start()方法);另一种是创建一个threading.Thread对象,在它的初始化函数(init())中将可调用对象作为参数传入。实际应用中,推荐优先使用threading模块而不是thread模块,(除非有特殊需要)。

【备注:线程的问题再看看。改善程序的91个建议一书+操作系统。】

GIL

Global Interpreter Lock 确保虚拟机中仅有一个线程运行。Python中的一种机制,导致多线程编程的不理想。

Multiprocessing

multiprocessing 是一个用与 threading 模块相似API的支持产生进程的包。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。

Pool: thread没有进程池,,Process类似thread.Thread

from multiprocessing import Pool,Process

def function(x):
    pass
def main():
    with Pool(5) as p:
        p.map(function,X)
        #function要加入进程任务的任务
        # x 参数序列
     p=Process(target=function,args=(x1,x2))
     p.satrt()
     p.join()

Queue

Pipe()

q=Queue()
parents_conn,child_conn=Pipe()# 代表管道两端
p=Process(target=function.args=(q,))
p=Process(target=function,args=(child_conn,))
p.start()
p.join()

Lock():保证进程同步,仅有一个进程打印状态

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Thread Pool

线程生命周期:create,initial,run,block,terminate

启动时间:Ts 运行时间Tr 销毁时间Td

构建一个POOL(队列)放入任务,多线程池依次从队列中取任务。避免线程不断创建销毁浪费时间,等全部任务完成后再销毁。

from threading import Thread
import Queue
class worker(Thread):
    def _init_(self,workQueue,resultQueue):
    def run(self):
        while True:
            try:
                callable,args,kwds = self.workQueue.get(False)#fetch a task
                self.resultQueue.put(callble(*args,**kwds))# return result
            except Queue.Empty:
                break
                
class WorkerManager():
# 管理线程
    def _init_(self,numbers_of_worker):
        self.workQueue=Queue.Queue()#创建任务队列
        self.resultQueue=Queue.Queue()#创建结果队列
        self.workers=[]
        self._recruitThreads(self, numbers_of_worker):
            for i inrange(numbers_of_worker):
                worker=Worker(self.workQueue,resultQueue)
                self.workers.append(worker)
     
    def start(self):#启动线程
        for w in self.workers: 
            w.start() 
    def wait_for_complete( self): 
        while len(self.workers): 
            worker = self.workers.pop(#从池中取出一个线程处理请求
            worker.join( ) 
            if worker.isAlive() and not self.workQueue.empty(): 
                self.workers.append( worker ) # 重新加入线程池中
        print "All jobs were completed." 
    def add_job( self, callable, *args, **kwds ): 
        self.workQueue.put( (callable, args, kwds) )           # 往工作队列中加入请求
    def get_result( self, *args, **kwds ):# 获取处理结果
        return self.resultQueue.get( *args, **kwds )
    def funtion():
                pass
    
wm = WorkerManager(2)                                           #
创建线程池
for i in urls:
        wm.add_job( download_file, i )                          #
将所有请求加入队列中
wm.start() 
wm.wait_for_complete()           

或者threadpool模块

import threadpool as tp

def function(args):
    pass
pool_size=2
pool=tp.ThreadPool(pool_size)# 创建pool
request=tp.makeRequests(function,args)# 创建request

pool.putRequest(threadpool.WorkRequest(download_file,args=))#将具体的请求放入线程池
pool.poll()#处理任务队列中的新的请求
pool.wait() 
print "destory all threads before exist"
pool.dismissWorkers(pool_size, do_join=True)#完成后退出

MPI

服务器上一般采用多核CPU运行程序,不同CPU之间如何协同合作、消息传递?MPI是一个通用的多线程工具,本文主要介绍MPI的python包用法。运行程序时,最常用的用法是直接多次运行mpirun -np 10 main.py [argments]

运行openAIES复现时程序

mpirun -bind-to core -map-by node -report-bindings python main.py -e 20 -g Alien -c ./configurations/sample_configurations.json -r Test

Open MPI v1.4.5 documentation

上一篇 下一篇

猜你喜欢

热点阅读