Python基于进/线程池实现大数据量爬虫项目
如今计算机已经进入多核CPU的时代了,使用多线程或多进程能够充分利用CPU多核性能来提高程序的执行效率。
Python多任务的解决方案主要有以下三种:
1.启动多进程,每个进程只有一个线程,通过多进程执行多任务;
2.启动单进程(即多线程),在进程内启动多线程,通过多线程执行多任务;
3.启动多进程,在每个进程内再启动多个线程,同时执行更多的任务;
我们都知道,由于Cpython解释器存在全局GIL锁原因,无论是单核还是多核CPU,任意特定时刻只有一个线程会被Python解释器执行。但是创建一个线程操作系统花费的开销要比创建一个进程少很多。
从这两个方面我们可以得出python多线程和多进程的选择的原则:多进程:高CPU利用型(计算密集型)
多线程:低CPU利用型(I/O密集型)
计算密集型特点
计算密集型任务的特点是需要进行大量的计算,在整个时间片内始终消耗CPU的资源。由于GIL机制的原因多线程中无法利用多核参与计算,但多线程之间切换的开销时间仍然存在,因此多线程比单一线程需要更多的执行时间。而多进程中有各自独立的GIL锁互不影响,可以充分利用多核参与计算,加快了执行速度。
I/O密集型特点
I/O密集型任务的特点是CPU消耗很少,任务大部分时间都在等待I/O操作的完成(I/O速度远低于CPU和内存速度)
python全局GIL锁
Python代码的执行由Python解释器进行控制。目前Python的解释器有多种,如CPython、PyPy、Jython等,其中CPython为最广泛使用的Python解释器。理论上CPU是多核时支持多个线程同时执行,但在Python设计之初考虑到在Python解释器的主循环中执行Python代码,于是CPython中设计了全局解释器锁GIL(Global Interpreter Lock)机制用于管理解释器的访问,Python线程的执行必须先竞争到GIL权限才能执行。全局解释器锁GIL机制流程
a、设置 GIL;
b、切换到一个线程去运行;
c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
d、把线程设置为睡眠状态;
e、解锁 GIL;
d、再次重复以上所有步骤。
池的概念:
创建进程或线程都需要消耗时间,销毁进程/线程也需要消耗时间。即便开启了成千上万的进程/线程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程/线程。
所以就有了池的概念。
定义一个池子,在里面放上固定数量的进程/线程,有需求来了,就拿一个池中的进程/线程来处理任务,等到处理完毕,并不关闭,而是将进程/线程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程/线程数量不够,任务就要等待之前的进程/线程执行任务完毕归来,拿到空闲进程/线程才能继续执行。
减少了进程/线程创建/销毁带来的消耗,同时又可以最大化的利用CPU。
进程池/线程池数量的确定
知道了池的概念,那进程/线程开启的数量该怎么确定呢?多少个进程/线程才能最大化利用CPU,并且不会给操作系统带来额外的消耗呢?进程数:CPU数量 < 进程数 < CPU数量*2
线程数: CPU数量 * 5
实际案例实现:
http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/135999
爬取药监局13万条医疗器材名录
方式一:python进程池实现13万+数据的爬取
进程池的创建很简单,直接用multiprocessing.Pool类
爬取的数据写入redis数据库中。
###进程池
import requests
from lxml import etree
import redis
from multiprocessing import Pool
redis = redis.Redis()
def get_msg(url,i):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
}
ret_html = requests.get(url=url, headers=headers)
ret_text = ret_html.text
tree = etree.HTML(ret_text)
tr_list = tree.xpath("/html/body/div/table/tbody/tr")
tr_list.remove(tr_list[7])
dic = {}
for tr in tr_list:
name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
dic[name] = value
redis.hset("equipment","%s"%i,dic)
print("已完成%s条信息"%i)
if __name__ == '__main__':
p = Pool(8)
for i in range(1,136000):
url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s"%i
# 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
# 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
# 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
res = p.apply_async(get_msg,args=(url,i))
# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,
# 等待进程池内任务都处理完,然后可以用get收集结果
# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
#调用join之前,先调用close函数,否则会出错。执行完close后不会有
# 新的进程加入到pool,join函数等待所有子进程结束
p.close()
p.join()
方式二:python线程池爬取13w+数据
线程池的创建有两种方式:
第一种:multiprocessing.dummy.Pool
第二种:基于queue队列来创建
下面这个示例采用第一种方式创建。
import requests
from lxml import etree
import redis
from multiprocessing.dummy import Pool as Threadpool
# 创建redis链接
redis = redis.Redis()
def get_msg(i):
url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s" % i
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
}
ret_html = requests.get(url=url, headers=headers)
ret_text = ret_html.text
tree = etree.HTML(ret_text)
tr_list = tree.xpath("/html/body/div/table/tbody/tr")
tr_list.remove(tr_list[7])
dic = {}
for tr in tr_list:
name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
dic[name] = value
redis.hset("equipment", "%s" % i, dic)
print("已完成%s条信息" % i)
def main():
th = Threadpool(20)
th.map(get_msg, [i for i in range(1, 136000)])
if __name__ == '__main__':
main()
方式三:python进程中开线程实现爬取数据:
import requests
from lxml import etree
import redis
from multiprocessing.dummy import Pool as Threadpool
from multiprocessing import Pool
import time
# 创建redis链接
redis = redis.Redis()
def get_msg(i):
url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s" % i
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
}
ret_html = requests.get(url=url, headers=headers)
ret_text = ret_html.text
tree = etree.HTML(ret_text)
tr_list = tree.xpath("/html/body/div/table/tbody/tr")
tr_list.remove(tr_list[7])
dic = {}
for tr in tr_list:
name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
dic[name] = value
redis.hset("Hequipment", "%s" % i, dic)
print("已完成第%s条信息" % i)
def main():
th = Threadpool(10)
th.map(get_msg, [i for i in range(1, 136000)])
if __name__ == '__main__':
start_time = time.time()
p = Pool(5)
for i in range(5):
p.apply_async(main)
p.close()
p.join()
stop_time= time.time()
print("136000条数据总共用时%s s"%(start_time- stop_time))
附:基于queue创建线程池的另一个例子:(6000+数据)
http://125.35.6.84:81/xk/
国家药品监督管理总局
import requests
import threading
import redis
import queue
redis = redis.Redis()
def get_msg(th, i):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
}
url = "http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsList"
data = {
'on': 'true',
'page': i,
'pageSize': 15,
'productName': '',
'conditionType': 1
}
proxies = {
"http": "122.226.73.12:80",
}
req = requests.post(url=url, headers=headers, data=data, proxies=proxies)
date_dict = req.json()
print("正在下载第%s页" % i)
for d in date_dict['list']:
ID = d["ID"]
n_data = {
"id": ID
}
ret_url = "http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsById"
text = requests.post(url=ret_url, data=n_data, headers=headers)
redis.hset("business", ID, text.text)
th.add_thread() # 向队列中添加线程,保证线程的数量
# 定义一个线程类
class ThreadPool(object):
def __init__(self, max_num):
# 基于队列来设定线程的总个数
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
# 定义方法从队列中得到线程
@property
def get_thread(self):
return self.queue.get()
# 定义方法线程结束后向队列中添加线程,保证线程总数量
def add_thread(self):
self.queue.put(threading.Thread)
if __name__ == '__main__':
# 创建线程池类,执行类的init方法
th = ThreadPool(20)
for i in range(1, 314):
# 从队列中获得线程,执行操作
thread = th.get_thread
cur_th = thread(target=get_msg, args=(th, i))
cur_th.start() # 开启线程
以上两个网站的数据爬去都是合法的,放心!!!