使用multiprocessing与requests进行爬虫制作
导读疑问:1、为什么用multiprocessing? 2、Multiprocessing中有哪些方法? 3、本次爬虫中如何使用多进程提升爬虫效率?
Ironman.jpg
1.为什么要使用multiprocessing:简单的爬虫一般都是单线程的,因为处理的数据量比较少,例如一些公司用的爬取数据的爬虫,只是定时执行爬取某日单个数据,哪这样的话多进程没有必要。但是当我们要爬取的数据有很大数目时,多线程或者多进程的必要性就显现了。在python处理这类问题时,一般有Threading和multiprocessing两个库可以进行,而Threadings实质上市进行多线程活动,在爬取性能上提升较少,而multiprocessing根据电脑CPU核数可以进行多进程并发任务,在性能上提升非常明显。
2.在以下例子中并没有将multiprocessing的方法全部展示,我只用到了Pool,map以及join和close;原因是在我的爬虫中需要处理的数据量有上十万的时候,单单使用apply等方法不能满足要求,而使用进程池pool则可以让处理更便捷。使用map是因为在爬虫中间过程中我也使用了多进程,需要取得进程执行结果,而使用apply_async则只能获取进程结果在内存中保存位置。
3.在本次爬虫中我有三处使用多进程,包括通过page_number获取pic_id和catalogid、通过pic_id与catalogid获得指向下载前连接url、通过下载前连接指向下载中间html中的目标url,在处理过程中每个数据集合都是十万级的,因此多进程非常重要。
以下只针对中间部分方法进行讲解,最后贴上全部代码。
def get_single_result(self, page_num):#单个页面处理
dict_middle_result = {}
true_result = {}
keyword = self.search_keyword
url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=%s&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%(keyword, page_num)
try:
response = requests.get(url)#接口请求
content = response.content#请求结果获得
result_2 = re.findall(r'\"imglist\"\:(.*)', content, re.S)[0]
result_3 = re.findall(r'\{\"pic.*\"\}', result_2, re.S)[0]#对信息使用正则表达式进行筛选
result_4 = result_3.split(',{')
for j in range(0, len(result_4)):
if j == 0:
dict_ = json.loads(result_4[j])
dict_middle_result[dict_['pic_id']] = dict_['catalogid']
else:
strlog = '{'+result_4[j]
dict_ = json.loads(strlog)
dict_middle_result[dict_['pic_id']] = dict_['catalogid']#通过关键字获得pic_id和catalogid等信息
true_result[page_num] = dict_middle_result
return true_result#正常返回{5:{pic_id:catalogid,.....}}
except:
false_result = {}
false_result[page_num] = 'fail'
return false_result#异常处理,返回字典{5:'fail'}
以上是一个通过上一接口请求获得总搜索获得页面数,通过访问页面数取得pic_id和catalogid的方法。
其中进行接口请求,对取得结果使用正则表达式进行切割取得最终需要的结果存入字典,因为需要使用到map,所以我们使用map方法时要提供一个可迭代对象,必然需要创建一个单个处理方法,用于处理迭代。在处理中使用带了try....except来处理异常,因为爬虫在短时间内向服务器发起的请求很多,非常容易被服务器拒绝,所以对异常的page_num也要进行保存,以待后续处理。
def get_middle_result_by_page_numbers(self):#多进程获取中间结果
final_result = []
list_for_pagenum = self.get_url_page_number_by_keyword()#此处为获得所有page_num的方法
flag = 0
while flag < 4:#循环执行
if len(list_for_pagenum) == 0:
break
elif flag == 3:
break
else:
p = multiprocessing.Pool(processes=5)#建立4进程
res = p.map(self.get_single_result, list_for_pagenum) #进程执行
p.close()#锁定进程
p.join()#进程空闲则加入请求
list_for_pagenum = []
for item in res:#{5:{pic_id:catalogid,.....}
for key in item:
if item[key] == 'fail':
list_for_pagenum.append(key)#异常page_num返回继续处理
else:
for i in item[key]:
dict_ = {}
dict_[i] = item[key][i]
final_result.append(dict_)
flag += 1
return final_result#返回列表结果,其中元素为多个字典
以上代码中,使用while循环处理3次(因为效率关系,没有进行while True死循环),每次处理完将可迭代对象list_for_page_num清空并将请求失败的page_num追加至列表继续处理。map方法需要的主要参数为单个处理方法get_single_result和一个用来提供对象给方法处理的可迭代对象。map处理完后将结果保存至res中,这是一个列表。方法返回为一个含有多个字典的列表用于后续多进程处理。
全部代码如下:
import requests
import json
import re
import sys
import random
import os
import time
import multiprocessing
from multiprocessing import Pool
from time import sleep
import copy_reg
import types
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)#使用copy_reg将MethodType注册为可序列化的方法
reload(sys)
sys.setdefaultencoding('utf-8')#初始化编码防止乱码
class PictureSpider(object):
def __init__(self, search_keyword):
self. search_keyword = search_keyword
def make_saving_path(self):#创建保存文件
key = self.search_keyword.encode('gbk')
directory_path = 'F:\\test_auto\\spider\\pictures\\%s' % key
os.mkdir(directory_path)
return directory_path
def get_url_page_number_by_keyword(self):#通过关键字取得下载中间结果字典(包含pic_id和catalogid的字典)
list_for_pagenum = []
keyword = self.search_keyword
dict_middle_result = {}
url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=1&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%keyword
response = requests.get(url)
content = response.content
result_1 = re.findall(r'\"pagecount\".*?\d+', content, re.S)
page_number = str(result_1[0].split(":")[-1])#总页数对应pageNum
for i in range(1, int(page_number)+1):
list_for_pagenum.append(i)
return list_for_pagenum
def get_single_result(self, page_num):#单个页面处理
dict_middle_result = {}
true_result = {}
keyword = self.search_keyword
url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=%s&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%(keyword, page_num)
try:
response = requests.get(url)
content = response.content
result_2 = re.findall(r'\"imglist\"\:(.*)', content, re.S)[0]
result_3 = re.findall(r'\{\"pic.*\"\}', result_2, re.S)[0]#对信息使用正则表达式进行筛选
result_4 = result_3.split(',{')
for j in range(0, len(result_4)):
if j == 0:
dict_ = json.loads(result_4[j])
dict_middle_result[dict_['pic_id']] = dict_['catalogid']
else:
strlog = '{'+result_4[j]
dict_ = json.loads(strlog)
dict_middle_result[dict_['pic_id']] = dict_['catalogid']#通过关键字获得pic_id和catalogid等信息
true_result[page_num] = dict_middle_result
return true_result#正常返回{5:{pic_id:catalogid,.....}}
except:
false_result = {}
false_result[page_num] = 'fail'
return false_result#异常处理,返回字典{5:'fail'}
def get_middle_result_by_page_numbers(self):
final_result = []
list_for_pagenum = self.get_url_page_number_by_keyword()
flag = 0
while flag < 3:
if len(list_for_pagenum) == 0:
break
elif flag == 3:
break
else:
p = multiprocessing.Pool(processes=5)#建立4进程
res = p.map(self.get_single_result, list_for_pagenum) #进程执行
p.close()
p.join()
list_for_pagenum = []
for item in res:#{5:{pic_id:catalogid,.....}
for key in item:
if item[key] == 'fail':
list_for_pagenum.append(key)
else:
for i in item[key]:
dict_ = {}
dict_[i] = item[key][i]
final_result.append(dict_)
flag += 1
return final_result#返回列表结果,其中元素为多个字典
def get_url_by_picid(self, pic_id_dict):#单个获得最终下载链接过程
pic_id_list = []
for i in pic_id_dict:
pic_id_list.append(i)
pic_id = pic_id_list[0]
catalogid = pic_id_dict[pic_id]
url = "http://www.quanjing.com/downcomp.aspx?pic_id=%s&Catalogid=%s" % (pic_id, catalogid)
response = requests.get(url)
content = response.content
final_url_1 = re.findall(r'<script.*</script>', content, re.S)[0]
final_url_2 = re.findall(r'document.location.href =(.*)\;<{1}', final_url_1, re.S)
final_url_3 = final_url_2[0]#正则表达式处理获得信息
final_url = re.findall(r'\'(.*)\'', final_url_3, re.S)[0]#取得最终下载URL
return final_url
def get_urls_by_middle_result(self):#多进行进行下载链接获取
final_result = self.get_middle_result_by_page_numbers()
p = multiprocessing.Pool(processes=4)#建立4进程
res = p.map(self.get_url_by_picid, final_result) #进程执行
p.close()
p.join()
return res
def get_picture_by_url(self, url):#单进程下载图片
key = self.search_keyword.encode('gbk')
try:
directory = self.make_saving_path()
except:
fold_path = 'F:\\test_auto\\spider\\pictures\\%s' % key
else:
fold_path = directory
picture_name = key+str(random.randint(10000, 99999))+'.jpg'#定义保存文件名
picture_path = '%s\\%s' % (fold_path, picture_name)
if os.path.exists(picture_path):
picture_name = key+str(random.randint(1000, 9999))+'.jpg'
picture_path = '%s\\%s' % (fold_path, picture_name)
try:
response = requests.get(url)
content = response.content
with open(picture_path, 'wb') as fi:
fi.write(content)
fi.close()
picture_size = os.path.getsize(picture_path)
if picture_size < 7000:#下载文件出错异常处理
os.remove(picture_path)
fail_url = url
dict_ = {}
dict_[fail_url] = 'fail'
return dict_
else:
print "%s下载完成..." % picture_name
success_url = url
dict_ = {}
dict_[success_url] = 'success'
return dict_#下载成功返回数据
except:#连接异常处理
print "%s连接失败..." % url
dict_ = {}
dict_[url] = 'fail'
return dict_#下载失败返回数据
def main():
start_time = time.time()
keyword = raw_input('请输入需要搜索下载的图片关键字:')
spider = PictureSpider(keyword)
url_pool = spider.get_urls_by_middle_result()
picture_num = len(url_pool)
print "根据关键字一共搜索到%s,现在开始下载..." % picture_num
while True: #因为多进程下载存在文件破损或者服务器因为访问过于频繁而出错,所以将破损文件URL加入URL池无限循环直到全部下载完成
if len(url_pool) == 0:
break
else:
p = multiprocessing.Pool(processes=4)#建立4进程
res = p.map(spider.get_picture_by_url, url_pool) #进程执行
p.close()
p.join()
url_pool = []
for item in res:
for key in item:
if item[key] == 'fail':
url_pool.append(key) #损坏文件重新加入URL池重新下载
end_time = time.time()
time_used = end_time - start_time#统计下载用时
print "全部下载完成,用时%s秒。" % time_used
if __name__ == '__main__':
main()
代码执行环境为python27,需要使用到的库都可以用pip install+库名安装,初学者,欢迎批评。
远山.jpg