算法工程Python学习

Python:多进程同步共享全局变量(锁,计数器,原子布尔)

2021-11-20  本文已影响0人  xiaogp

摘要:Python,多进程

多进程变量同步的场景和方法

场景:在使用Python多进程并行时需要在进程间共享变量,这些共享的变量可以更好地控制和把握任务执行的情况,比如查看任务进度,提前停止任务等
方法:在多线程中变量共享在主线程中定义变量,在每个子线程中使用global关键字拿到变量,再配合threading.RLock()在对变量操作时拿到和释放锁(acquirerelease)即可,但是在多进程中,变量是放在不同子进程的数据区中,每个进程都是独立的地址空间,所以用一般的方法是不能共享变量的,multiprocessing模块提供了ArrayManagerValue类来定义共享变量,能够实现进程间共享数字,字符串,列表,字典,实例对象的变量共享


共享整数变量

对于整数的多进程共享是常用的场景,比如使用多进程并行任务,需要记录执行日志记录任务进度,实例代码如下

import multiprocessing
from multiprocessing import Pool, Lock, Value

from utils.logger_utils import logging_

LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log"))
lock = Lock()
Counter = Value('i', 0)
ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ent_name_predict.txt"), "r", encoding="utf8").readlines()]))
TOTAL = len(ENT_LIST)

def get_one_res(data):
    global TOTAL, lock, Counter
    res = {}
    try:
        ent_name = data
        res = get_feature(ent_name, PREDICT_DATE)
        res["updatedate"] = PREDICT_DATE
        res["uid"] = get_md5(formatted_ent(ent_name))
    except Exception as e:
        LOGGER.error(data + ":错误:" + e.args[0])
    finally:
        with lock:
            Counter.value += 1
        LOGGER.info("执行完成:(%d / %d) 进程号: %d --------------- %s", Counter.value, TOTAL, os.getpid(), data)
    return res


if __name__ == '__main__':
    pool = Pool(int(get_string("process_num")))
    res = pool.map(get_one_res, ENT_LIST)
    LOGGER.info("全部执行完成,关闭进程池")
    pool.close()
    pool.join()

运行查看执行日志

2021-11-18 15:19:15 [predict_main] INFO [42] 执行完成:(1 / 1400) 进程号: 15 --------------- 深圳顺亚投资有限公司
2021-11-18 15:19:16 [predict_main] INFO [42] 执行完成:(2 / 1400) 进程号: 25 --------------- 芜湖新扬投资合伙企业(有限合伙)
2021-11-18 15:19:18 [predict_main] INFO [42] 执行完成:(3 / 1400) 进程号: 24 --------------- 保定隆瑞房地产开发有限公司
2021-11-18 15:19:19 [predict_main] INFO [42] 执行完成:(4 / 1400) 进程号: 11 --------------- 云南俊发凯丰房地产开发有限公司

在全局定义锁和计数器,Value('i', 0)代表定义的共享变量是int类型初始值是0,如果要定义double变量则使用Value('d', 0),相当于java里面的原子变量,在执行函数中调用with上下文在实行完任务后调用Counter.value += 1实现计数+1,最后在进程池中调用执行方法,每个并行的任务在执行完毕会调用锁进行计数器+1,同一时刻只有一个子进程拿到锁实现进程同步,如果不采用锁的方式,在日志中计数器会乱序,但是最终总的值相等


共享布尔变量

这种情况在全局中记录一个布尔变量,每次执行任务前拿到变量判断是否与预期一致,如果执行报错修改变量状态,多用于子进程中任务报错提前结束全部任务全部退出,代码如下

from multiprocessing import Pool, Lock, Manager
from ctypes import c_bool
import os

lock = Lock()
ERROR = Manager().Value(c_bool, False)


def run(fn):
    global tests_count, lock, ERROR
    if not ERROR.value:
        try:
            print('执行任务. PID: %d ' % (os.getpid()))
            1 / 0
        except Exception as e:
            with lock:
                ERROR.value = True
    else:
        print("子进程报错,任务结束")


if __name__ == "__main__":
    pool = Pool(10)
    # 80个任务,会运行run()80次,每次传入xrange数组一个元素
    pool.map(run, list(range(80)))
    pool.close()
    pool.join()

查看执行输出

执行任务. PID: 27374 
子进程报错,任务结束
子进程报错,任务结束
子进程报错,任务结束
...
Process finished with exit code 0

初始化一个共享变量为布尔类型为False,每个进程在执行前先拿到共享变量判断是否为False,是则执行任务,否则直接跳过执行。初始化布尔变量使用Manager类实例化后调用Value方法,c_bool是Ctypes下的数据类型,相关类型如下

另一种是在主进程中判断共享变量,调用map_async使得主进程不被子进程阻塞,主进程判断全局变量如果不符合预期直接退出,调用terminate终止线程池

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
ERROR = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        1 / 0
    except:
        with lock:
            ERROR.value = True
    finally:
        with lock:
            COUNTER.value += 1
            print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("主进程判断...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if ERROR.value:
            print("子进程报错,主进程提前退出")
            pool.terminate()
            break
    pool.join()

输出如下,每隔1秒中检查全局变量ERROR,如果变为True主进程终止进程池

主进程判断...
执行任务(1 / 80). PID: 4168 
执行任务(2 / 80). PID: 4169 
执行任务(3 / 80). PID: 4177 
执行任务(4 / 80). PID: 4171 
执行任务(5 / 80). PID: 4173 
执行任务(6 / 80). PID: 4182 
执行任务(7 / 80). PID: 4183 
执行任务(8 / 80). PID: 4174 
执行任务(9 / 80). PID: 4179 
执行任务(10 / 80). PID: 4181 
子进程报错,主进程提前退出

Process finished with exit code 0

一个实用的例子是多进程找一个列表中符合要求第一个值,如果找到则退出多进程

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
FOUND = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        res = fn + 1
        if res == 10:
            print("结果是:{}".format(fn))
            with lock:
                FOUND.value = True
            return fn
    except Exception as e:
        print(e)
    finally:
        with lock:
            COUNTER.value += 1
            print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    t1 = time.time()
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("主进程判断...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if FOUND.value:
            print("已找到结果")
            pool.terminate()
            break
    pool.join()
    t2 = time.time()
    print(t2 - t1)

共享字典和数组变量

使用Manager近创建,Manager().dict()Manager().list(),测试代码如下

from multiprocessing.pool import Pool
from multiprocessing import Manager, Lock
import time
import datetime

LOCK = Lock()
DICT = Manager().dict()
LIST = Manager().list()


def job(ent):
    with LOCK:
        if len(LIST) < 5:
            time.sleep(1)
            LIST.append(ent)
        else:
            if len(LIST) and ent <= len(LIST) - 1:
                LIST.pop(ent)
        print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
              "LIST:{}".format(LIST))


def job2(ent):
    if len(LIST) < 5:
        time.sleep(1)
        LIST.append(ent)
    else:
        if len(LIST) and ent <= len(LIST) - 1:
            LIST.pop(ent)
    print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
          "LIST:{}".format(LIST))


if __name__ == '__main__':
    pool = Pool(10)
    pool.map(job2, list(range(10)) * 2)
    pool.close()
    pool.join()

执行job输出如下,输出结果和单进程单线程的结果一致,按照顺序每个进程在锁外排队

dt:2021-11-20 22:01:28 ent:0 LIST:[0]
dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1]
dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3]
dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4]
dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1]
dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3]
dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5]

Process finished with exit code 0

如果执行job2则不控制共享变量的同步则完全失控,报错pop index out of range,原因是在对变量操作时有其他进程也在操作得到意想不到的结果

dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5]
dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6]
dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8]
dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
...
    raise self._value
IndexError: pop index out of range

共享自定义对象

上一篇下一篇

猜你喜欢

热点阅读