python mutiprocessing

2021-06-10  本文已影响0人  hehehehe
def do_work_list_slice_one(func, work_size, lists, *others):
    with ProcessPoolExecutor(max_workers=work_size) as pool:
        futures = [pool.submit(do_func, func, *para) for para in lists]
        result_count = Counter([future.result() for future in futures])
        result_dict = dict(result_count)
        logger.info(result_dict)
        logger.info("all finished!")
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter

from tools.logger_config import logger


def do_func(func, *paras):
    try:
        return func(*paras)
    except Exception as e:
        logger.exception(f"do_func error {paras}: ", e)
        return "fail"


def do_work_ntile(func, work_size, ntile):
    """
    Args:
        func:
        work_size:
        ntile:
    Returns:
    """
    tasks = []
    pool = multiprocessing.Pool(work_size)
    for tile in ntile:
        min_val = tile[0]
        max_val = tile[1]
        tasks.append(pool.apply_async(do_func, (func, min_val, max_val)))

    pool.close()
    pool.join()
    result_count = Counter([task.get() for task in tasks])
    result_dict = dict(result_count)
    logger.info(result_dict)

    logger.info("all finished!")


def do_work_list_slice(func, work_size, step, lists, *others):
    """
    :param func:
    :param work_size:
    :param step:
    :param lists:
    :return:
    """
    pool = multiprocessing.Pool(work_size)
    tasks = []
    start = 0
    end = step
    len_list = len(lists)
    while True:
        tasks.append(pool.apply_async(do_func, (func, lists, start, end, *others)))
        start = end
        end += step
        if start > len_list:
            break

    pool.close()
    pool.join()
    result_count = Counter([task.get() for task in tasks])
    result_dict = dict(result_count)
    logger.info(result_dict)

    logger.info("all finished!")


def do_work_offset(func, work_size, step, min_num, max_num):
    """
    :param func:
    :param work_size:
    :param step:
    :param min_num:
    :param max_num:
    :return:
    """
    tasks = []
    pool = multiprocessing.Pool(work_size)
    offset = min_num
    while True:
        if offset > max_num:
            break
        tasks.append(pool.apply_async(do_func, (func, step, offset)))
        offset += step

    pool.close()
    pool.join()
    result_count = Counter([task.get() for task in tasks])
    result_dict = dict(result_count)
    logger.info(result_dict)

    logger.info("all finished!")

# if __name__ == '__main__':
# do_work_concurrent(do, 1, 1, 0, 10)
# do_work_dedump(do, 1, 1, [1, 2, 3, 4, 5])



import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed

start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    futures = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
        print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    print '#### 分界线 ####'
    for future in as_completed(futures, timeout=2):
        print '执行中:%s, 已完成:%s' % (future.running(), future.done())
end = time.time()
print 'Took %.3f seconds.' % (end - start)

===========================================================================

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
futures = list()
with ProcessPoolExecutor(max_workers=2) as pool:
    for pair in numbers:
        future = pool.submit(gcd, pair)
        futures.append(future)
print 'results: %s' % [future.result() for future in futures]
end = time.time()
print 'Took %.3f seconds.' % (end - start)

===========================================================================

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION

start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    futures = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
        print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    print '#### 分界线 ####'
    done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED)
    for d in done:
        print '执行中:%s, 已完成:%s' % (d.running(), d.done())
        print d.result()
end = time.time()
print 'Took %.3f seconds.' % (end - start)
上一篇下一篇

猜你喜欢

热点阅读