使用multiprocessing与requests进行爬虫制作

2017-12-28  本文已影响83人  奕剑听雨

导读疑问:1、为什么用multiprocessing? 2、Multiprocessing中有哪些方法? 3、本次爬虫中如何使用多进程提升爬虫效率?


Ironman.jpg

1.为什么要使用multiprocessing:简单的爬虫一般都是单线程的,因为处理的数据量比较少,例如一些公司用的爬取数据的爬虫,只是定时执行爬取某日单个数据,哪这样的话多进程没有必要。但是当我们要爬取的数据有很大数目时,多线程或者多进程的必要性就显现了。在python处理这类问题时,一般有Threading和multiprocessing两个库可以进行,而Threadings实质上市进行多线程活动,在爬取性能上提升较少,而multiprocessing根据电脑CPU核数可以进行多进程并发任务,在性能上提升非常明显。

2.在以下例子中并没有将multiprocessing的方法全部展示,我只用到了Pool,map以及join和close;原因是在我的爬虫中需要处理的数据量有上十万的时候,单单使用apply等方法不能满足要求,而使用进程池pool则可以让处理更便捷。使用map是因为在爬虫中间过程中我也使用了多进程,需要取得进程执行结果,而使用apply_async则只能获取进程结果在内存中保存位置。

3.在本次爬虫中我有三处使用多进程,包括通过page_number获取pic_id和catalogid、通过pic_id与catalogid获得指向下载前连接url、通过下载前连接指向下载中间html中的目标url,在处理过程中每个数据集合都是十万级的,因此多进程非常重要。

以下只针对中间部分方法进行讲解,最后贴上全部代码。

    def get_single_result(self, page_num):#单个页面处理
        dict_middle_result = {}
        true_result = {}
        keyword = self.search_keyword
        url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=%s&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%(keyword, page_num)
        try:
            response = requests.get(url)#接口请求
            content = response.content#请求结果获得
            result_2 = re.findall(r'\"imglist\"\:(.*)', content, re.S)[0]
            result_3 = re.findall(r'\{\"pic.*\"\}', result_2, re.S)[0]#对信息使用正则表达式进行筛选
            result_4 = result_3.split(',{')
            for j in range(0, len(result_4)):
                if j == 0:
                    dict_ = json.loads(result_4[j])
                    dict_middle_result[dict_['pic_id']] = dict_['catalogid']
                else:
                    strlog = '{'+result_4[j]
                    dict_ = json.loads(strlog)
                    dict_middle_result[dict_['pic_id']] = dict_['catalogid']#通过关键字获得pic_id和catalogid等信息
            true_result[page_num] = dict_middle_result
            return true_result#正常返回{5:{pic_id:catalogid,.....}}
        except:
            false_result = {}
            false_result[page_num] = 'fail'
            return false_result#异常处理,返回字典{5:'fail'}

以上是一个通过上一接口请求获得总搜索获得页面数,通过访问页面数取得pic_id和catalogid的方法。
其中进行接口请求,对取得结果使用正则表达式进行切割取得最终需要的结果存入字典,因为需要使用到map,所以我们使用map方法时要提供一个可迭代对象,必然需要创建一个单个处理方法,用于处理迭代。在处理中使用带了try....except来处理异常,因为爬虫在短时间内向服务器发起的请求很多,非常容易被服务器拒绝,所以对异常的page_num也要进行保存,以待后续处理。

    def get_middle_result_by_page_numbers(self):#多进程获取中间结果
        final_result = []
        list_for_pagenum = self.get_url_page_number_by_keyword()#此处为获得所有page_num的方法
        flag = 0
        while flag < 4:#循环执行
            if len(list_for_pagenum) == 0:
                break
            elif flag == 3:
                break
            else:
                p = multiprocessing.Pool(processes=5)#建立4进程
                res = p.map(self.get_single_result, list_for_pagenum)  #进程执行
                p.close()#锁定进程
                p.join()#进程空闲则加入请求
                list_for_pagenum = []
                for item in res:#{5:{pic_id:catalogid,.....}
                    for key in item:
                        if item[key] == 'fail':
                            list_for_pagenum.append(key)#异常page_num返回继续处理
                        else:
                            for i in item[key]:
                                dict_ = {}
                                dict_[i] = item[key][i]
                                final_result.append(dict_)
                flag += 1
        return final_result#返回列表结果,其中元素为多个字典

以上代码中,使用while循环处理3次(因为效率关系,没有进行while True死循环),每次处理完将可迭代对象list_for_page_num清空并将请求失败的page_num追加至列表继续处理。map方法需要的主要参数为单个处理方法get_single_result和一个用来提供对象给方法处理的可迭代对象。map处理完后将结果保存至res中,这是一个列表。方法返回为一个含有多个字典的列表用于后续多进程处理。

全部代码如下:

import requests
import json
import re
import sys
import random
import os
import time
import multiprocessing
from multiprocessing import Pool
from time import sleep
import copy_reg
import types


def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)#使用copy_reg将MethodType注册为可序列化的方法

reload(sys)
sys.setdefaultencoding('utf-8')#初始化编码防止乱码


class PictureSpider(object):

    def __init__(self, search_keyword):
        self. search_keyword = search_keyword

    def make_saving_path(self):#创建保存文件
        key = self.search_keyword.encode('gbk')
        directory_path = 'F:\\test_auto\\spider\\pictures\\%s' % key
        os.mkdir(directory_path)
        return directory_path

    def get_url_page_number_by_keyword(self):#通过关键字取得下载中间结果字典(包含pic_id和catalogid的字典)
        list_for_pagenum = []
        keyword = self.search_keyword
        dict_middle_result = {}
        url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=1&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%keyword
        response = requests.get(url)
        content = response.content
        result_1 = re.findall(r'\"pagecount\".*?\d+', content, re.S)
        page_number = str(result_1[0].split(":")[-1])#总页数对应pageNum
        for i in range(1, int(page_number)+1):
            list_for_pagenum.append(i)
        return list_for_pagenum

    def get_single_result(self, page_num):#单个页面处理
        dict_middle_result = {}
        true_result = {}
        keyword = self.search_keyword
        url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=%s&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%(keyword, page_num)
        try:
            response = requests.get(url)
            content = response.content
            result_2 = re.findall(r'\"imglist\"\:(.*)', content, re.S)[0]
            result_3 = re.findall(r'\{\"pic.*\"\}', result_2, re.S)[0]#对信息使用正则表达式进行筛选
            result_4 = result_3.split(',{')
            for j in range(0, len(result_4)):
                if j == 0:
                    dict_ = json.loads(result_4[j])
                    dict_middle_result[dict_['pic_id']] = dict_['catalogid']
                else:
                    strlog = '{'+result_4[j]
                    dict_ = json.loads(strlog)
                    dict_middle_result[dict_['pic_id']] = dict_['catalogid']#通过关键字获得pic_id和catalogid等信息
            true_result[page_num] = dict_middle_result
            return true_result#正常返回{5:{pic_id:catalogid,.....}}
        except:
            false_result = {}
            false_result[page_num] = 'fail'
            return false_result#异常处理,返回字典{5:'fail'}

    def get_middle_result_by_page_numbers(self):
        final_result = []
        list_for_pagenum = self.get_url_page_number_by_keyword()
        flag = 0
        while flag < 3:
            if len(list_for_pagenum) == 0:
                break
            elif flag == 3:
                break
            else:
                p = multiprocessing.Pool(processes=5)#建立4进程
                res = p.map(self.get_single_result, list_for_pagenum)  #进程执行
                p.close()
                p.join()
                list_for_pagenum = []
                for item in res:#{5:{pic_id:catalogid,.....}
                    for key in item:
                        if item[key] == 'fail':
                            list_for_pagenum.append(key)
                        else:
                            for i in item[key]:
                                dict_ = {}
                                dict_[i] = item[key][i]
                                final_result.append(dict_)
                flag += 1
        return final_result#返回列表结果,其中元素为多个字典

    def get_url_by_picid(self, pic_id_dict):#单个获得最终下载链接过程
        pic_id_list = []
        for i in pic_id_dict:
            pic_id_list.append(i)
        pic_id = pic_id_list[0]
        catalogid = pic_id_dict[pic_id]
        url = "http://www.quanjing.com/downcomp.aspx?pic_id=%s&Catalogid=%s" % (pic_id, catalogid)
        response = requests.get(url)
        content = response.content
        final_url_1 = re.findall(r'<script.*</script>', content, re.S)[0]
        final_url_2 = re.findall(r'document.location.href =(.*)\;<{1}', final_url_1, re.S)
        final_url_3 = final_url_2[0]#正则表达式处理获得信息
        final_url = re.findall(r'\'(.*)\'', final_url_3, re.S)[0]#取得最终下载URL
        return final_url

    def get_urls_by_middle_result(self):#多进行进行下载链接获取
        final_result = self.get_middle_result_by_page_numbers()
        p = multiprocessing.Pool(processes=4)#建立4进程
        res = p.map(self.get_url_by_picid, final_result)  #进程执行
        p.close()
        p.join()
        return res

    def get_picture_by_url(self, url):#单进程下载图片
        key = self.search_keyword.encode('gbk')
        try:
            directory = self.make_saving_path()
        except:
            fold_path = 'F:\\test_auto\\spider\\pictures\\%s' % key
        else:
            fold_path = directory
        picture_name = key+str(random.randint(10000, 99999))+'.jpg'#定义保存文件名
        picture_path = '%s\\%s' % (fold_path, picture_name)
        if os.path.exists(picture_path):
            picture_name = key+str(random.randint(1000, 9999))+'.jpg'
            picture_path = '%s\\%s' % (fold_path, picture_name)
        try:
            response = requests.get(url)
            content = response.content
            with open(picture_path, 'wb') as fi:
                fi.write(content)
                fi.close()
            picture_size = os.path.getsize(picture_path)
            if picture_size < 7000:#下载文件出错异常处理
                os.remove(picture_path)
                fail_url = url
                dict_ = {}
                dict_[fail_url] = 'fail'
                return dict_
            else:
                print "%s下载完成..." % picture_name
                success_url = url
                dict_ = {}
                dict_[success_url] = 'success'
                return dict_#下载成功返回数据
        except:#连接异常处理
            print "%s连接失败..." % url
            dict_ = {}
            dict_[url] = 'fail'
            return dict_#下载失败返回数据


def main():
    start_time = time.time()
    keyword = raw_input('请输入需要搜索下载的图片关键字:')
    spider = PictureSpider(keyword)
    url_pool = spider.get_urls_by_middle_result()
    picture_num = len(url_pool)
    print "根据关键字一共搜索到%s,现在开始下载..." % picture_num
    while True:  #因为多进程下载存在文件破损或者服务器因为访问过于频繁而出错,所以将破损文件URL加入URL池无限循环直到全部下载完成
        if len(url_pool) == 0:
            break
        else:
            p = multiprocessing.Pool(processes=4)#建立4进程
            res = p.map(spider.get_picture_by_url, url_pool)  #进程执行
            p.close()
            p.join()
            url_pool = []
            for item in res:
                for key in item:
                    if item[key] == 'fail':
                        url_pool.append(key)  #损坏文件重新加入URL池重新下载
    end_time = time.time()
    time_used = end_time - start_time#统计下载用时
    print "全部下载完成,用时%s秒。" % time_used


if __name__ == '__main__':
    main()

代码执行环境为python27,需要使用到的库都可以用pip install+库名安装,初学者,欢迎批评。


远山.jpg
上一篇下一篇

猜你喜欢

热点阅读