Python 并发总结

2019-02-18  本文已影响0人  flashine

0x01 GIL锁

C语言写的Python解释器存在全局解释器锁GIL(Global Interpreter Lock),由于GIL的存在,在同一时间内,Python解释器只能运行一个线程,所以Python多线程实际上运行的时候只存在一个线程。但是这种情况只存在于Cpython中,对于Jpython(Java解释器)和Rpython(Python解释器)则不存在。
  GIL对计算密集型的程序会产生影响,因为计算密集型的程序,需要占用系统资源,GIL使得程序相当于始终在进行单线程运算。
  对于IO密集型任务适用于多线程,对于计算密集型任务则适用于多进程:
  1. IO密集型:磁盘IO、网络IO。比如网络请求、文件读写等。
  2. 计算密集型:指CPU计算占主要的任务。

0x02 threadpool实现多线程

基于python2的线程池实现多线程:

# -*- coding: UTF-8 -*-
import threadpool 
import time,random 
import Queue

def hello(str): 
    time.sleep(2) 
    return str 

def print_ret(request, result): 
    print "the result is %s %r\n" % (request.requestID, result) 


def deal_task(pool):
    try:
        pool.poll(True)
    except Exception, e:
        print str(e)

q = Queue.Queue()
for i in range(100):
    q.put(i)

lst = [q.get() for i in range(q.qsize())]

pool = threadpool.ThreadPool(20) 
# 第一个参数为线程执行函数,第二个参数为线程函数的参数
# 最后一个参数为对前两个函数运行结果的操作,request和hello
requests = threadpool.makeRequests(hello, lst, print_ret) 
for req in requests:
    pool.putRequest(req)

pool.wait()

0x03 threading 实现多线程

threading 多线程示例代码

# -*- coding: UTF-8 -*-
import requests
import threading
import Queue

url = "xxx"
threads = 100
q = Queue.Queue()
for i in range(20):
    q.put(i)

def send():
    while not q.empty():
        q.get()
        r = requests.post(url, data={})
        print(r.text)

if __name__ == '__main__':
    for i in range(threads):
        t = threading.Thread(target=send)
        t.start()
    for i in range(threads):
        t.join()

代码中未涉及到多线程变量操作,所以没有采用线程锁,需要使用的时候再加

lock = threding.Lock()

def test():
    lock.acquire()
    # operate param
    lock.release()

0x04 ThreadPoolExecutor 实现多线程

ThreadPoolExecutor支持Python3和Python2。使用ThreadPoolExecutor可以自动调度线程,它实现了对threadingmultiprocessing的进一步抽象,而且在进行文件读写操作时不需要在代码中额外使用线程锁。

# -*- coding: UTF-8 -*-
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED
import time

header = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
}

def check_words(url,path):
    """
    从网页中查找关键词并写入指定的path
    """
    print(url)

if __name__ == "__main__":
    urls = []
    path = ""
    with ThreadPoolExecutor(max_workers=48) as executor:
        # submit(线程函数,线程参数)
        tasks = [executor.submit(check_words, url ,path) for url in urls]
        for i, task in enumerate(as_completed(tasks)):
            print("\r 已完成第{0}/100个=>"。format(i), end="")  

ThreadPoolExecutor使用submit()方法向线程池中提交一次线程运行所需的参数,而使用map()方法则可以直接提交集合,即map(list())as_completed函数可以检测当前执行的线程函数是否运行完毕,add_done_callback是线程执行结果的回调,如需要获取线程执行返回值则可以在futures上进行绑定,wait方法可以让主线程阻塞直到满足设定要求。

def handler(result):
    res = res.result()
    #handle result
    
future.add_done_callback(handler)
wait(futures, return_when=ALL_COMPLETED)

使用多次之后发现ThreadPoolExecutor结合requests时往往会出现线程假死的情况:实际还在运行,但是不输出任何结果,原因未知。

0x05 asyncio + aiohttp 实现协程并发

参考使用Python进行并发编程 发现在进行多线程的网络请求时使用asyncio+aiohttp能够达到最高效的执行速度。

# -*- coding: UTF-8 -*-
import aiohttp
import asyncio
import async_timeout
import time
import ssl

header = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36',
    'Cookie': ''
}

async def check(url):
    try:
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:# 忽略校验ssl
            async_timeout.timeout(5)
            async with session.get(url, headers=header, ssl=ssl.SSLContext()) as response:
                res = ""
                if response.status == 200:
                    res = await response.text()
                    link_word = set()
                    with open('./dark_link.txt', 'r', encoding='utf-8') as dark_file:
                        for word in dark_file.readlines():
                            if res.find(word.strip()) != -1:
                                link_word.add(word.strip())
                    if len(link_word) != 0:
                        print("".join(["[+]", url, "发现关键词:"]), link_word)
                        return [url, "", response.status, link_word]
                    else:
                        return [url, "", response.status, ""]
                else:
                    return [url, "", response.status, ""]
    except:
        return [url, "", "连接异常", ""]


if __name__ == '__main__':
    urls = set()
    with open(r'urls.txt') as pf:
        for line in pf.readlines():
            urls.add(line.strip())
 # windows编程需要使用ProactorEventLoop,其他情况下使用asyncio.get_event_loop()
 event_loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(event_loop)
    tasks = [check(url) for url in urls]
    results = event_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=False))# 请求函数运行结束后收集各个结果
    for item in results:
        print(item)

目前对asyncio了解较少,以后了解多了再详写

0x06 multiprocessing.dummy 线程池

  1. map
    用法基本与ThreadPoolExecutor 的map用法一致
from multiprocessing.dummy import Pool as ThreadPool
import requests

arg = [3, 5, 11, 19, 12]
pool = ThreadPool(processes=3)
return_list = pool.map(requests.get, arg)
pool.close()
pool.join()
print(return_list)
  1. apply_async
from multiprocessing.dummy import Pool as ThreadPool
import requests

async_pool = ThreadPool(4)
results =[]
for i in range(5):
    msg = 'msg: %d' % i
    result = async_pool.apply_async(requests.get, (msg, ))
    results.append(result)
 
for i in results:
    i.wait()  # 等待线程函数执行完毕
 
for i in results:
    if i.ready():  # 线程函数是否已经启动了
        if i.successful():  # 线程函数是否执行成功
            print(i.get())  # 理论上是线程函数返回值,实际测试代码时未输出任何内容,原因未知
上一篇下一篇

猜你喜欢

热点阅读