Python基于进/线程池实现大数据量爬虫项目

2018-11-10  本文已影响0人  SlashBoyMr_wang

如今计算机已经进入多核CPU的时代了,使用多线程或多进程能够充分利用CPU多核性能来提高程序的执行效率。

Python多任务的解决方案主要有以下三种:
1.启动多进程,每个进程只有一个线程,通过多进程执行多任务;
2.启动单进程(即多线程),在进程内启动多线程,通过多线程执行多任务;
3.启动多进程,在每个进程内再启动多个线程,同时执行更多的任务;

我们都知道,由于Cpython解释器存在全局GIL锁原因,无论是单核还是多核CPU,任意特定时刻只有一个线程会被Python解释器执行。但是创建一个线程操作系统花费的开销要比创建一个进程少很多。
从这两个方面我们可以得出python多线程和多进程的选择的原则:

多进程:高CPU利用型(计算密集型)
多线程:低CPU利用型(I/O密集型)

计算密集型特点
计算密集型任务的特点是需要进行大量的计算,在整个时间片内始终消耗CPU的资源。由于GIL机制的原因多线程中无法利用多核参与计算,但多线程之间切换的开销时间仍然存在,因此多线程比单一线程需要更多的执行时间。而多进程中有各自独立的GIL锁互不影响,可以充分利用多核参与计算,加快了执行速度。

I/O密集型特点
I/O密集型任务的特点是CPU消耗很少,任务大部分时间都在等待I/O操作的完成(I/O速度远低于CPU和内存速度)

python全局GIL锁
Python代码的执行由Python解释器进行控制。目前Python的解释器有多种,如CPython、PyPy、Jython等,其中CPython为最广泛使用的Python解释器。理论上CPU是多核时支持多个线程同时执行,但在Python设计之初考虑到在Python解释器的主循环中执行Python代码,于是CPython中设计了全局解释器锁GIL(Global Interpreter Lock)机制用于管理解释器的访问,Python线程的执行必须先竞争到GIL权限才能执行。

全局解释器锁GIL机制流程
  a、设置 GIL;
  b、切换到一个线程去运行;
  c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
  d、把线程设置为睡眠状态;
  e、解锁 GIL;
  d、再次重复以上所有步骤。

池的概念:
  创建进程或线程都需要消耗时间,销毁进程/线程也需要消耗时间。即便开启了成千上万的进程/线程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程/线程。
所以就有了池的概念。
  定义一个池子,在里面放上固定数量的进程/线程,有需求来了,就拿一个池中的进程/线程来处理任务,等到处理完毕,并不关闭,而是将进程/线程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程/线程数量不够,任务就要等待之前的进程/线程执行任务完毕归来,拿到空闲进程/线程才能继续执行。
  减少了进程/线程创建/销毁带来的消耗,同时又可以最大化的利用CPU。

进程池/线程池数量的确定
知道了池的概念,那进程/线程开启的数量该怎么确定呢?多少个进程/线程才能最大化利用CPU,并且不会给操作系统带来额外的消耗呢?

进程数:CPU数量 < 进程数 < CPU数量*2
线程数: CPU数量 * 5

实际案例实现:
http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/135999
爬取药监局13万条医疗器材名录

方式一:python进程池实现13万+数据的爬取
  进程池的创建很简单,直接用multiprocessing.Pool类
  爬取的数据写入redis数据库中。

###进程池

import requests
from lxml import etree
import redis
from multiprocessing import Pool

redis = redis.Redis()

def get_msg(url,i):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
    }
    ret_html = requests.get(url=url, headers=headers)
    ret_text = ret_html.text

    tree = etree.HTML(ret_text)
    tr_list = tree.xpath("/html/body/div/table/tbody/tr")
    tr_list.remove(tr_list[7])
    dic = {}
    for tr in tr_list:
        name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
        value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
        dic[name] = value

    redis.hset("equipment","%s"%i,dic)
    print("已完成%s条信息"%i)

if __name__ == '__main__':
    p = Pool(8)

    for i in range(1,136000):
        url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s"%i
        # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
        # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
        # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
        # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 
        res = p.apply_async(get_msg,args=(url,i))

    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,
    # 等待进程池内任务都处理完,然后可以用get收集结果
    # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    #调用join之前,先调用close函数,否则会出错。执行完close后不会有
    # 新的进程加入到pool,join函数等待所有子进程结束
    p.close()
    p.join()

方式二:python线程池爬取13w+数据
  线程池的创建有两种方式:
    第一种:multiprocessing.dummy.Pool
    第二种:基于queue队列来创建
  下面这个示例采用第一种方式创建。

import requests
from lxml import etree
import redis
from multiprocessing.dummy import Pool as Threadpool

# 创建redis链接
redis = redis.Redis()

def get_msg(i):
    url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s" % i
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
    }
    ret_html = requests.get(url=url, headers=headers)
    ret_text = ret_html.text

    tree = etree.HTML(ret_text)
    tr_list = tree.xpath("/html/body/div/table/tbody/tr")
    tr_list.remove(tr_list[7])
    dic = {}
    for tr in tr_list:
        name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
        value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
        dic[name] = value

    redis.hset("equipment", "%s" % i, dic)
    print("已完成%s条信息" % i)

def main():
    th = Threadpool(20)
    th.map(get_msg, [i for i in range(1, 136000)])

if __name__ == '__main__':
    main()

方式三:python进程中开线程实现爬取数据:

import requests
from lxml import etree
import redis
from multiprocessing.dummy import Pool as Threadpool
from multiprocessing import Pool
import time

# 创建redis链接
redis = redis.Redis()

def get_msg(i):
    url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s" % i
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
    }
    ret_html = requests.get(url=url, headers=headers)
    ret_text = ret_html.text

    tree = etree.HTML(ret_text)
    tr_list = tree.xpath("/html/body/div/table/tbody/tr")
    tr_list.remove(tr_list[7])
    dic = {}
    for tr in tr_list:
        name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
        value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
        dic[name] = value

    redis.hset("Hequipment", "%s" % i, dic)
    print("已完成第%s条信息" % i)

def main():
    th = Threadpool(10)
    th.map(get_msg, [i for i in range(1, 136000)])

if __name__ == '__main__':
    start_time = time.time()
    p = Pool(5)
    for i in range(5):
        p.apply_async(main)
    p.close()
    p.join()
    stop_time= time.time()
    print("136000条数据总共用时%s s"%(start_time- stop_time))

附:基于queue创建线程池的另一个例子:(6000+数据)
  http://125.35.6.84:81/xk/
  国家药品监督管理总局

import requests
import threading
import redis
import queue

redis = redis.Redis()

def get_msg(th, i):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
    }
    url = "http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsList"

    data = {
        'on': 'true',
        'page': i,
        'pageSize': 15,
        'productName': '',
        'conditionType': 1
    }
    proxies = {
        "http": "122.226.73.12:80",
    }
    req = requests.post(url=url, headers=headers, data=data, proxies=proxies)

    date_dict = req.json()
    print("正在下载第%s页" % i)

    for d in date_dict['list']:
        ID = d["ID"]

        n_data = {
            "id": ID
        }
        ret_url = "http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsById"
        text = requests.post(url=ret_url, data=n_data, headers=headers)
        redis.hset("business", ID, text.text)

    th.add_thread()  # 向队列中添加线程,保证线程的数量

# 定义一个线程类
class ThreadPool(object):
    def __init__(self, max_num):
        # 基于队列来设定线程的总个数
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    # 定义方法从队列中得到线程
    @property
    def get_thread(self):
        return self.queue.get()

    # 定义方法线程结束后向队列中添加线程,保证线程总数量
    def add_thread(self):
        self.queue.put(threading.Thread)

if __name__ == '__main__':

    # 创建线程池类,执行类的init方法
    th = ThreadPool(20)

    for i in range(1, 314):
        # 从队列中获得线程,执行操作
        thread = th.get_thread
        cur_th = thread(target=get_msg, args=(th, i))
        cur_th.start()  # 开启线程

以上两个网站的数据爬去都是合法的,放心!!!

上一篇下一篇

猜你喜欢

热点阅读