Python 运维

Python多进程

2016-12-08  本文已影响0人  四号公园_2016

Python多进程(包含数据汇总哦!)

1.正常情况下的多进程

(1)一般也就写文件(窜行)
(2)入库,写数据库里(入库复杂)
(3)20个任务,开20个进程同时跑,函数里面自己作写文件或者入库的操作

# -*- coding=utf-8 -*-

import json
import multiprocessing


def get_page(page):
    print page
    return page

    
if __name__ == "__main__":
    workers = []
    # 开启20个进程同时跑数据
    for i in range(20):
        prc = multiprocessing.Process(target=get_page,args=(i,))
        prc.start()
        workers.append(prc)
        
    # 保证所有进程都跑完结束程序
    for worker in workers:
        worker.join()

2.多进程数据汇总

数据简单处理,汇总大列表或者大字典(ansible源码学习所得,感谢开源软件)

核心思想呢是
《1》创建2个多进程特定队列,由multiprocessing模块提供,一个任务队列,一个结果队列
《2》开启的多进程数量是可以控制的
《3》每个进程跑的任务函数是特殊性的,这个任务函数是从任务队列接收数据,跑结果,结果放结果队列中。_executor_hook函数
《4》实际接收完数据跑任务的呢是get_page函数,实际生产环境中一般耗时的任务处理都写这个函数里
《5》从结果队列中取数据,因为已知传进结果队列中的都是普通字符串,所以可以直接列表的append操作就可以完成数据汇总,如果传进结果队列中的是列表,就通过列表的extend进行最后数据汇总,根据实际情况来做

# -*- coding=utf-8 -*-

import json
import multiprocessing


def get_page(page):
    return page
    

'''流程控制函数,从任务队列中取数据,调用get_page去跑数据,然后结果放到结果队列中取'''  
def _executor_hook(job_queue, result_queue):
    while not job_queue.empty():
        try:
            page = job_queue.get(block=False)
            return_data = get_pag(page)
            result_queue.put(return_data)
        except Queue.Empty:
            pass
        except:
            traceback.print_exc()
    
if __name__ == "__main__":
  
    '''1.多进程方式跑数据,从任务队列中获取数据,跑数据,结果存到结果队列中'''
    manager = multiprocessing.Manager()
    # 创建任务队列
    job_queue = manager.Queue()
    # 任务队列中添加100个数字
    for page in range(1,101):
        job_queue.put(page)
    # 创建结果队列
    result_queue = manager.Queue()
    
    workers = []
    # 开启20个进程同时跑数据
    for i in range(20):
        # 每个进程跑的函数是_executor_hook,传递进去的参数分别是任务队列和结果队列
        prc = multiprocessing.Process(target=_executor_hook,args=(job_queue, result_queue))
        prc.start()
        workers.append(prc)
        
    # 保证所有进程都跑完结束程序
    for worker in workers:
        worker.join()
        
    '''2.不断从结果队列中取数据,添加列表返回最后结果'''
    results = []
    try:
        while not result_queue.empty():
            results.append(result_queue.get(block=False))
    except Exception,e:
        pass
    else:
        print json.dumps(results,indent=4)
上一篇下一篇

猜你喜欢

热点阅读