大数据

python多任务并行

2019-03-26  本文已影响0人  一ke大白菜

1、concurrent.futures

concurrent.futures模块提供了一个用于异步执行callables的高级接口。

这里面有三个重要的类。

2、应用场景

下面这段资料来自廖雪峰的python关于线程与进程的介绍。

如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。

如果写一个死循环的话,会出现什么情况呢?

打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。

我们可以监控到一个死循环线程会100%占用一个CPU。

如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。

要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。

试试用Python写个死循环:

import threading, multiprocessing

def loop():
x = 0
while True:
    x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

结论就是:python不可能用多线程实现多任务并行,但是多进程就不存在这个问题。

举个栗子,下面是一天时间的解压归档日志:

[appl@SZVM-EXlOIT-1-243 test1]$ pwd
/opt/appl/test1
appl@SZVM-EXlOIT-1-243 test1]$ ll
总用量 684224
-rw-r----- 1 appl fspfappl 5243679 3月  13 00:20 access.log.2019-03-13-00.0
-rw-r----- 1 appl fspfappl 5248039 3月  13 00:53 access.log.2019-03-13-00.1
-rw-r----- 1 appl fspfappl  721806 3月  13 00:59 access.log.2019-03-13-00.2
-rw-r----- 1 appl fspfappl 5243914 3月  13 01:54 access.log.2019-03-13-01.0
-rw-r----- 1 appl fspfappl  387323 3月  13 01:59 access.log.2019-03-13-01.1
-rw-r----- 1 appl fspfappl 4510087 3月  13 02:59 access.log.2019-03-13-02.0

ProcessPoolExecutormap函数可以在单机下利用多核资源去处理数据,这在数据量不是很大的情况下耗时和分布式计算相差无几,并且python的编程比使用spark job,hadoop mapreduce编程要简单的多。

参考代码:

import re
from pandas import DataFrame
import concurrent.futures
import glob

regrex = re.compile(r".*?(/ktb/[a-zA-Z\_\/]*?)\d*? +?.*")

def parse_file(file):
access_list = []
with open(file) as f:
    for line in f:
        result = regrex.match(line)
        if result:
            access_list.append(result.group(1))
return access_list

fileList = glob.glob('/opt/appl/test1/*')

access_lists = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for access_list in executor.map(parse_file,fileList):
        access_lists.extend(access_list)

df = DataFrame({'access':access_lists})
上一篇 下一篇

猜你喜欢

热点阅读