python3爬虫系列

聚沙成塔--爬虫系列(十五)(如何,成为设计师)

2017-11-07  本文已影响5人  爱做饭的老谢

版权声明:本文为作者原创文章,可以随意转载,但必须在明确位置标明出处!!!

tips:本基础系列旨在以爬虫带大家入门Python语言

上一篇文章讲了多线程的基本使用及需要注意的地方,在Python中如果你要使用多线程,那么你的程序一定要是I/O密集型的这样才能发挥多线程的优势,如果你的程序设计是CPU计算密集型的那么使用多进程的设计。多进程的使用方式将在下一篇文章讲到。本篇文章的重点是讲怎么去设计多线程,设计是一门技术活啊,编码只是程序员掌握的最最基本的必要条件,随着你编码的时间越来越长经验越来越丰富,你对编码的理解也越来越深了,你再也不想只当个搬砖的了,你想要成为一个NB的设计师,你想让那些搬砖的人根据你的设计去干活。呵呵,骚年想要成为一个NB的设计师你要的路还很长。作为初学者来说作者建议目前设计模式你还不需要去深入研究,要成为一个设计师不是一朝一夕的事,是设计模式的灵活运用、是常年项目经验的累积、是框架设计的反复推敲。当你有了一定的编码经验那么你可以看看《大话设计模式》,再编码的时候就可以尝试运用里面的设计模式来编写你的代码。

包工头

就像王健林说的我们先定个小目标“先挣它一个亿”呵呵,当然这对他来说确实是个小目标,对我们这些普通人来说这个目标有点大啊,我们只能先定一个小小目标,我们的目标就是成为一个包工头,让你手底下的几十号人按照你的设计去做。一定要有当将军的觉悟,这样我们才能慢慢的向高手进发。

设计思想

我们知道Pyhton里的多线程对于I/O密集型的任务有显著的提高,那么就我们的爬虫程序想想我们应该怎么去设计多线程呢?要成为一个合格的设计师首先是要去了解需求,需求了解了才能指定计划。我们之前写的爬虫逻辑是怎么样的呢?打开一个网络请求--》解析网络请求返回回来的网页元素--》写入文档或数据库。这就是之前爬虫程序的整个流程,我们现在考虑一下引入多线程后应该怎么去编写它,两种想法

  1. 每个线程都去执行所有的逻辑,从打开一个网络请求到写入文档或数据库。
  2. 把每个步骤的逻辑分开,让线程各自去赋值单一逻辑的处理,这里我们把它设计为一个线程去处理网络请求,多个线程去处理解析操作,一个线程去处理写入文档或数据库操作,这个模式可以概括为一个输入、一个输出、多个工作线程。这种模式显然比第一种好,为什么这样设计呢?输入线程只负责去请求网络数据,将请求到的数据交给工作线程去处理,工作线程处理完后的数据交接输出线程做最后的输入工作,这样每个逻辑上都是独立的,专业的人做专业的事,这里还需要重点说一下为什么设计成一个输出,如果是多个输出不加锁的情况下会照成很多问题,比如将数据写入文档,A线程还没有把数据写完CPU时间片道理,那么B现在就开始写,这就会造成最终的结构是乱序的,I/O操作本来就是比较费时的,多线程的设计对它并没有多大性能的提升,二期使用多线程设计输出那么就必须的加锁保证数据同步,这也会增大资源开销,所以设计为一个输出是合理的


    流程图

Queue(队列)

如果我们设计成一个输入、多个工作、一个输出工作模式,那么我们就会面临一个问题,我们的数据怎么传递呢?特别是对于工作线程,因为它是多个线程那么肯定会存在资源竞争,所以我们必须得保证每个工作线程拿到的数据都是唯一的。这里就需要引入一个队列的概念了,这个模块是Python自带的,你可以直接使用它,像下面这样

import queue

queue模块提供了两种队列,一种是先入先出队列,一种是后入先出队列它们的定义如下

# 先入先出队列
Queue(maxsize=0)
# 后入先出队列
LifoQueue(maxsize=0)

maxsize为队列大小,默认为0,表示无限制队列,就是队列可以无限大只要你的内存够用,如果maxsize大于零,则表示限制队列,当队列满了会阻塞直到有空余的位置为止,就像饭店排队吃饭一样,店满了那么你只能等吃饭的人离开空出位置后你才能去。队列的工作模式是取一个消息就少一个消息。Queue对象提供了一下方法

实战

okay,该讲的必要条件都已经讲过了,下面我们将爬虫程序改写成上面说的到设计模式,我们将输入线程命名为AccessTread、工作线程命名为WorkThreads、输出线程命名为OutThread,修改后结果如下:

#AccessThread.py
import threading
from urllib import request
from urllib import error

class AccessThread(threading.Thread):
    """docstring for AccessThread"""
    def __init__(self, work_queue, args):
        super(AccessThread, self).__init__()
        self.args = args
        self.work_queue = work_queue

    def run(self):
        self.request_url()

    def request_url(self):
        try:
            for page in range(1, 14):
                req = request.Request(self.args['url'] + str(page), headers=self.args['headers'])
                print(self.args['url'] + str(page))
                # 打开一个请求
                response = request.urlopen(req)
                # 读取服务器返回的页面数据内容
                content = response.read().decode('utf-8')
                self.work_queue.put(content)

        except error.URLError as e:
            print(e.reason)
# WorkThreads.py
import threading
import re, os
from urllib import request

class WorkThreads(threading.Thread):
    """docstring for WorkThreads"""
    def __init__(self, work_queue, out_queue):
        super(WorkThreads, self).__init__()
        self.work_queue = work_queue
        self.out_queue = out_queue

    def run(self):
        self.deal_work()

    def deal_work(self):
        while True:
            content = self.work_queue.get()
            if content:
               
                pattern = re.compile(r'<div class="article block untagged mb15[\s\S]*?class="stats-vote".*?</div>', re.S)
                userinfos = re.findall(pattern, content)
                
                if userinfos:
                    pattern = re.compile(r'<a href="(.*?)".*?<h2>(.*?)</h2>.*?<div class="content">(.*?)</div>.*?<i class="number">(.*?)</i>', re.S)

                    picture = re.compile(r'<div class="thumb">.*?src="(.*?)"', re.S)

                    for userinfo in userinfos:
                        item = re.findall(pattern, userinfo)
                        pictures = re.findall(picture, userinfo)
                        try:
                            if item:
                                infos = []
                                userid, name, content, num = item[0]
                                # 去掉换行符,<span></span>,<br/>符号
                                userid = re.sub(r'\n|<span>|</span>|<br/>', '', userid)
                                name = re.sub(r'\n|<span>|</span>|<br/>', '', name)
                                content = re.sub(r'\n|<span>|</span>|<br/>|\x01', '', content)
                                
                                if pictures:
                                    path = './users/'
                                    if not os.path.exists(path):
                                        os.makedirs(path)

                                    request.urlretrieve('http:' + pictures[0], path + os.path.basename(pictures[0]))
                                    infos.append((userid, name, int(num), content, pictures[0]))
                                    self.out_queue.put((userid, name, int(num), content, pictures[0]))
                                else:
                                    infos.append((userid, name, int(num), content, ' '))
                                    self.out_queue.put((userid, name, int(num), content, ' '))
                                   
                        except Exception as e:
                            print(e)
# OutThread.py
import threading
from .database.MySQLImpl import MySQLImpl
from .database.SqliteImpl import SqliteImpl
from .database.SqlalchemyImpl import SqlalchemyImpl

class OutThread(threading.Thread):
    """docstring for OutThread"""
    def __init__(self, out_queue):
        super(OutThread, self).__init__()

        self.out_queue = out_queue
        self.sqlite = SqliteImpl()

    def run(self):
        self.out_work()


    def out_work(self):
        while True:
            msg = self.out_queue.get()
            #print(msg)
            if msg:
                self.sqlite.insert_record(msg)
# Scheduler.py
from .AccessThread import AccessThread
from .WorkThreads import WorkThreads
from .OutThread import OutThread
import queue

class Scheduler(object):

    def __init__(self):
        self.work_queue = queue.Queue()
        self.out_queue = queue.Queue()
        
    def handle(self):
        threads = []
        dict_info = {}
        dict_info['url'] = 'https://www.qiushibaike.com/8hr/page/'
        dict_info['headers'] = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 MicroMessenger/6.5.2.501 NetType/WIFI WindowsWechat QBCore/3.43.691.400 QQBrowser/9.0.2524.400'}

        acc_thread = AccessThread(self.work_queue, dict_info)
        
        for _ in range(10):
            work_thread = WorkThreads(self.work_queue, self.out_queue)
            threads.append(work_thread)
        
        out_thread = OutThread(self.out_queue)

        threads.append(acc_thread)
        threads.append(out_thread)

        for t in threads:
            t.daemon = True
            t.start()

        while True:
            alive = False
            for t in threads:
                alive = alive or t.is_alive()

            if not alive:
                break

完整的代码放在我的git上,地址https://github.com/Gavinxyj/Python/tree/master/python_study/Scrapy/modules欢迎大家fork、star。

okay,本篇对多线程设计的介绍就到此结束了,读者一定要亲自上机验证才回有收获,一定不要有你觉得很简单它就真的简单了。实践是验证理论唯一的途径。


欢迎关注我:「爱做饭的老谢」,老谢一直在努力...

上一篇 下一篇

猜你喜欢

热点阅读