Python七号Python

并发时用多线程还是协程?

2019-07-20  本文已影响17人  somenzz

之前对自己要求,每周更新至少一篇,然而最近已经有两周没有更新。其实可以找很多借口,但没有做到就是没有做到,对自己食言也是一种失信,时间,常常对自己有所要求,比如每周至少一篇,但往往都没有严格执行。因此我非常佩服那些具有稳定输出的牛人,比如阮一峰,辉哥奇谭,半佛仙人,码农翻身等等。

他们是怎么做到的,我想习惯是一个很重要的因素。比如我现在已经养成每天都做些轻运动的习惯,到了时间点,身体会自动引着我去运动。另一方面,就是有能力排除一切影响这个习惯成长的因素,这一点最佩服跟谁学课程中教英语的宋老师,他是计算机科班出身,但是却喜欢英语,为了学英语,他 10 年没朋友,不聚会。可以说,他是真的热爱英语。同样的,上述稳定输出的人,都非常热爱写作。

今天分享一下自己对 Python 并发编程的一些理解,如果你经常要高效地去爬取一些数据,我想对你应该有帮助。

写并发的程序,让你有一种当指挥官的感觉,是非常有成熟感的。动手写几行代码,成百上千的进程或线程就开始拼命的为你执行任务,看着屏幕上它们执行成功的反馈信息,看着就很爽。

我最近就在写这样的爬虫程序,使用近万个 IP 代理并发地请求一个网站的数据,请求成功的次数越多越好,所花的时间越短越好。

方法一:线程池 concurrent.futures VS multiprocessing.dummy

首先我就想到了多线程实现,正好最近学习了 concurrent.futures,它是在 python3.2 中引入的。于是马上把样例代码拿来稍做调整,一个 demo 就有了:

import concurrent.futures

def request_task(url, proxy):
    """
    :param url: url链接
    :param proxy: 的代理
    :return: 0 表示失败 1 成功
    """
    try:
        html = requests.get(
            url,
            proxies={"http": f"http://{proxy}", "https": f"https://{proxy}"},
            timeout=30,
        )
        if html.status_code == 200:
            message = json.loads(html.content, encoding="utf-8")
            logger.info(f"{proxy} -> {message}")
            if message["message"] == "成功!":
                return 1
            else:
                return 0
        else:
            return 0
    except Exception as e:
        pass
    return 0


def multi_request_from_proxy(url, proxys):
    '''
    开启 100 个线程,使用代理池中的代理,并发的对 url 执行函数 request_task
    :param url: 待请求的 url
    :param proxys: 代理池
    :return: None
    '''
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        executor.map(request_task, [url] * len(proxys), proxys)
        
if __name__ == "__main__":
    url = 'xxxxx' ##目标网站
    proxys = get_proxys()#获取免费代理池中的近万个代理
    multi_request_from_proxy(url, proxys)
    

直接就试跑了一下,当时就看到了屏幕上打印了很多请求成功的信息,想到这 100 个线程在为我干近万个网络请求的休力活,感觉很好。

由于 IP 代理池中的代理很快就会失效,比如 10 分钟就会失效,那么,如果在 10 分钟内没有尽快把有用的 IP 用完,那么就相当于浪费了部分可用的 IP 。

于是我把请求代理的任务和并发去执行请求的任务放在循环里不停的执行,并 multi_request_from_proxy 也使用一个线程来执行。

import threading
threading.Thread(target=multi_request_from_proxy,args=(url,unused_proxys)).start()

每当有新的 IP 们(多个)时,我就启一个线程来使用它们,这个线程又会生成 100 个线程去消耗这些 IP 尽量做到不浪费。

但好景不长,由于虽然 IP 多,无效的 IP 也多,这样网络请求的时间就变长了。感觉还是慢,想到之前用过 multiprocessing 这个库,里面也提供线程池,于是我就深试了这个库,又重新写了下代码。

from multiprocessing.dummy import Pool as ThreadPool

def request_task2(proxy):
    """
    :param proxy: 代理
    :return: 0 表示失败 1 表示成功
    """
    url = 'http://somesite.net.cn'
    try:
        html = requests.get(
            url,
            proxies={"http": f"http://{proxy}", "https": f"https://{proxy}"},
            timeout=30,
        )
        # html = requests.get(vote_url, timeout=30)
        if html.status_code == 200:
            message = json.loads(html.content, encoding="utf-8")
            logger.info(f"{proxy} -> {message}")
            if message["message"] == "成功!":
                return 1
            else:
                return 0
        else:
            return 0
    except Exception as e:
        pass
    return 0

def multi_request_from_proxy2(proxys):
    '''
    使用代理池并发的执行请求函数 request_task2
    :param proxys: 代理池
    :return: None
    '''
    with ThreadPool(processes=100) as executor:
        executor.map(request_task2, proxys)

从执行的成功结果和耗时来看,multiprocessing.dummy 更适合这种场景。

网上查了下他们之前区别:

1、显然用futures的写法上更简洁一些,concurrent.futures 的性能并没有更好,只是让编码变得更简单。考虑并发编程的时候,任何简化都是好事。从长远来看,concurrent.futures 编写的代码更容易维护。

2、使用map时,future 是逐个迭代提交,multiprocessing 是批量提交jobs,因此对于大批量 jobs 的处理,multiprocessing.Pool 效率会更高一些。对于需要长时间运行的作业,用 future更佳,future 提供了更多的功能(callback, check status, cancel)。

从结果来看还真是这样。

方法二:协程 asyncio

协程是轻量级的线程,避免了线程之间切换的资源消耗,效率应该更快吧。

async def asyn_request_task(url,proxy):
    proxyurl = f'http://{proxy}'
    # print(proxy)
    async with aiohttp.ClientSession() as session:
        try:
            for i in range(5):
                async with session.get(url,proxy = proxyurl,timeout = 30) as resp:
                    message_str = await resp.text(encoding='utf-8')
                    message = json.loads(message_str)
                    if resp.status == 200:
                        if message["message"] == "成功!"
                            logger.info(f"{proxy} -> {message}")
                        else:
                            break
                    else:
                        break
                        # print("fail")
        except Exception as e:
            pass
            # logger.info(e)

async def asyn_tasks(url,proxys):
    tasks = [asyncio.create_task(asyn_request_task(url,proxy)) for proxy in proxys]
    await asyncio.gather(*tasks)

asyncio.run(asyn_tasks(url,unused_proxys))

通过实际验证,当代理数量超过 500 内时,代码效率非常高,效率上要比多线程高 2 倍左右(从实际运行的时间来看),但是代理数量超过 500 时程序会报错。

当然这个方法好解决,就是将总的代理数量切分成小于等于 500 的组来分别用协程调用。

最终我选择了协程。协程的代码可能不太好理解,参考前文:
Python 协程学习笔记

网上也可以搜索到大量关于 Python 协程的学习资源,这里就不多说了。

协程和多线程的区别

协程是实现并发编程的一种方式。一说并发,你肯定想到了多线程 / 多进程模型,没错,多线程 / 多进程,正是解决并发问题的经典模型之一。最初的互联网世界,多线程 / 多进程在服务器并发中,起到举足轻重的作用。

我们知道,在处理 I/O 操作时,使用多线程与普通的单线程相比,效率得到了极大的提高。你可能会想,既然这样,为什么还需要协程(Asyncio)?

诚然,多线程有诸多优点且应用广泛,但也存在一定的局限性:

比如,多线程运行过程容易被打断,因此有可能出现竞争条件 (race condition) 的情况;再如,线程切换本身存在一定的损耗,线程数不能无限增加,因此,如果你的 I/O 操作非常耗时,多线程很有可能满足不了高效率、高质量的需求。

因此,协程是更轻量级的线程,它的切换成本非常低,相对比协程更高效一些。

那么什么场景使用多线程,什么场景使用协程呢(Asyncio)? 请参考正面的代码:

如果是 I/O 密集型,且 I/O 请求比较耗时的话,使用协程。
如果是 I/O 密集型,且 I/O 请求比较快的话,使用多线程。
如果是 计算 密集型,考虑可以使用多核 CPU,使用多进程。

if io_bound:
    if io_slow:
        print('Use Asyncio')
    else:
        print('Use multi-threading')
else if cpu_bound:
    print('Use multi-processing')

多线程和协程之间的共同点和区别:

共同点:

都是并发操作,多线程同一时间点只能有一个线程在执行,协程同一时间点只能有一个任务在执行;

不同点:

多线程,是在I/O阻塞时通过切换线程来达到并发的效果,在什么情况下做线程切换是由操作系统来决定的,开发者不用操心,但会造成竞争条件 (race condition) ;

协程,只有一个线程,在I/O阻塞时通过在线程内切换任务来达到并发的效果,在什么情况下做任务切换是开发者决定的,不会有竞争条件 (race condition) 的情况;多线程的线程切换比协程的任务切换开销更大;
对于开发者而言,多线程并发的代码比协程并发的更容易书写。

一般情况下协程并发的处理效率比多线程并发更高。

(完)

欢迎关注微信公众号 somenzz

上一篇下一篇

猜你喜欢

热点阅读