爬虫博客干货小白编程之美-Pyhon

微信公众号爬虫

2018-08-06  本文已影响718人  纳米君

微信公众号爬虫,目前主要有两种方式:

  1. 搜狗搜索有专门的公众号搜索功能,不过只能获取公众号最近10条群发信息,如下图:


    image.png
  2. 登录 微信公共号平台,点击左侧素材管理 ---- 新建图文素材 ---- 点击超链接按钮,弹出框如下:

    image.png
    输入想查询的公众号,点击进去,如下图,可以获取所有的记录。
    image.png
第一种爬虫方式:

跟上一篇的爬虫比较,依旧使用了协程的方式。
使用了类封装,便于调用。
增加了装饰器:打印爬虫时间
解析HTML的库:PyQuery,用过jquery的很容易上手。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import functools

import aiohttp
import asyncio
import re
from urllib.parse import quote

import xlwt
from pyquery import PyQuery
from selenium import webdriver

import time

from selenium.webdriver.chrome.options import Options


def cal_time(func):
    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        start = time.time()
        func(self)
        end = time.time()
        print('{fun} time is {time}'.format(fun=func.__name__, time=end - start))

    return wrapper


class WechatSpider:

    def __init__(self, *args):
        self.account_name = args
        self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0'}
        self.excel_head = ['时间', '文章标题', '文章地址', '文章简介']
        self.semaphore = asyncio.Semaphore(10)

    @staticmethod
    def get_target_url(sogou_html):
        doc = PyQuery(sogou_html)
        return doc('div[class="txt-box"] p[class="tit"] a').attr('href')

    @staticmethod
    def parse_wechat_html(wechat_html):
        doc = PyQuery(wechat_html)
        articles = doc('div[class="weui_media_box appmsg"]')
        result_list = []
        if articles:
            for article in articles.items():
                # 获取标题
                title = article('h4[class="weui_media_title"]').text()
                # 获取标题url
                url = 'http://mp.weixin.qq.com' + article('h4[class="weui_media_title"]').attr('hrefs')
                # 获取概要内容
                summary = article('.weui_media_desc').text()
                # 获取文章发表时间
                date = article('.weui_media_extra_info').text()
                # 获取文章图片url
                style = article('.weui_media_hd').attr('style')
                pic_url = re.findall('.*?url\((.*?)\)', style)

                result_list.append({
                    'title': title.replace('原创', '').strip(),
                    'url': url,
                    'summary': summary,
                    'date': date.replace('原创', '').strip(),
                    'pic_url': pic_url[0] if pic_url else 'parse pic_url failed'
                })

        return result_list

    @staticmethod
    def get_chrome_result(target_url):
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        driver = webdriver.Chrome(chrome_options=chrome_options,
                                  executable_path='D:\\Software\\chromedriver.exe')
        driver.get(target_url)
        return driver.execute_script("return document.documentElement.outerHTML")

    @staticmethod
    def excel_style():
        style = xlwt.XFStyle()
        alignment = xlwt.Alignment()
        alignment.horz = xlwt.Alignment.HORZ_CENTER  # 水平居中
        alignment.vert = xlwt.Alignment.VERT_CENTER  # 垂直居中
        style.alignment = alignment

        return style

    def write_to_excel(self, result, name):
        wbk = xlwt.Workbook()
        sheet = wbk.add_sheet('Sheet1')
        col_1 = sheet.col(0)
        col_2 = sheet.col(1)
        col_3 = sheet.col(2)
        col_4 = sheet.col(3)
        col_1.width = 256 * 18
        col_2.width = 256 * 45
        col_3.width = 256 * 30
        col_4.width = 256 * 150

        for index, head_name in enumerate(self.excel_head):
            sheet.write(0, index, head_name, WechatSpider.excel_style())

        for index, item in enumerate(result):
            sheet.write(index + 1, 0, item['date'], WechatSpider.excel_style())
            sheet.write(index + 1, 1, item['title'])
            sheet.write(index + 1, 2, item['url'])
            sheet.write(index + 1, 3, item['summary'])

        wbk.save('{name}_{date}.xls'.format(name=name, date=time.strftime('%Y-%m-%d')))

    async def get_gzh_info(self, url, name):
        async with self.semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=self.headers) as res:
                    if res.status == 200:
                        text = await res.text()
                        target_url = WechatSpider.get_target_url(text)
                        wechat_html = WechatSpider.get_chrome_result(target_url)
                        result = WechatSpider.parse_wechat_html(wechat_html)
                        self.write_to_excel(result, name)

    @cal_time
    def run(self):
        assert self.account_name, 'WechatSpider初始化至少提供一个公众号名字'
        loop = asyncio.get_event_loop()
        tasks = []
        for name in self.account_name:
            url = 'http://weixin.sogou.com/weixin?type=1&s_from=input&query=%s&ie=utf8&_sug_=n&_sug_type_=' % quote(
                name)
            tasks.append(asyncio.ensure_future(self.get_gzh_info(url, name)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()


if __name__ == '__main__':
    WechatSpider('不知了了', '上海发布').run()

第二种爬虫方式:

需要自己注册一个公众号,获取自己的token,目标公众号的fakeid,以及登录后的Cookie。而且get请求一次一定要休息一会。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from datetime import datetime
import functools
import requests
import xlwt
import time

import math


def cal_time(func):
    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        start = time.time()
        func(self)
        end = time.time()
        print('{fun} time is {time}'.format(fun=func.__name__, time=end - start))

    return wrapper


class WechatSpider:

    def __init__(self, *args):
        self.result = []
        self.account_name = args
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0',
            'Cookie': your_cookie
        }
        self.excel_head = ['时间', '文章标题', '文章地址']
        self.url = 'https://mp.weixin.qq.com/cgi-bin/appmsg'
        self.begin = 0
        self.data = {
            # 登录用户的token
            "token": your_token,
            "lang": "zh_CN",
            "f": "json",
            "ajax": "1",
            "action": "list_ex",
            "begin": self.begin,
            "count": "5",
            "query": "",
            # 不同的公众号,fakeid不一样
            "fakeid": gzh_fakeid,
            "type": "9",
        }

    @staticmethod
    def excel_style():
        style = xlwt.XFStyle()
        alignment = xlwt.Alignment()
        alignment.horz = xlwt.Alignment.HORZ_CENTER  # 水平居中
        alignment.vert = xlwt.Alignment.VERT_CENTER  # 垂直居中
        style.alignment = alignment

        return style

    def write_to_excel(self, result, name):
        wbk = xlwt.Workbook()
        sheet = wbk.add_sheet('Sheet1')
        col_1 = sheet.col(0)
        col_2 = sheet.col(1)
        col_3 = sheet.col(2)
        col_4 = sheet.col(3)
        col_1.width = 256 * 18
        col_2.width = 256 * 45
        col_3.width = 256 * 30
        col_4.width = 256 * 150

        for index, head_name in enumerate(self.excel_head):
            sheet.write(0, index, head_name, WechatSpider.excel_style())

        for index, item in enumerate(result):
            sheet.write(index + 1, 0, item['date'], WechatSpider.excel_style())
            sheet.write(index + 1, 1, item['title'])
            sheet.write(index + 1, 2, item['url'])

        wbk.save('{name}_{date}.xls'.format(name=name, date=time.strftime('%Y-%m-%d')))

    def get_gzh_info(self, url):
        res = requests.get(url, headers=self.headers, params=self.data)
        if res.status_code == 200:
            json = res.json()
            count = json['app_msg_cnt']
            for item in json["app_msg_list"]:
                # 提取每页文章的标题及对应的url
                date = datetime.fromtimestamp(item["update_time"])
                d = {"title": item["title"], "url": item["link"], 'date': datetime.strftime(date, '%Y-%m-%d')}
                self.result.append(d)
            loop_count = math.ceil(count / 5)
            i = 0
            while loop_count > 1:
                time.sleep(5)
                i += 1
                loop_count -= 1
                self.begin = 5 * i
                self.get_gzh_info(url)

    @cal_time
    def run(self):
        assert self.account_name, 'WechatSpider初始化至少提供一个公众号名字'
        for name in self.account_name:
            self.get_gzh_info(self.url)
            self.write_to_excel(self.result, name)


if __name__ == '__main__':
    WechatSpider('不知了了').run()

time.sleep(5),5s一次,都提示操作频繁,稍后再试。emmmm,时间自己慢慢调试。


结语:所有的爬虫流程都大同小异,都是先找到目标url,然后获取响应进行解析。难点主要还是网站针对爬虫做出的各种限制。

上一篇下一篇

猜你喜欢

热点阅读