14.scrapy实战之招聘网站进行整站爬取

2018-06-14  本文已影响0人  MononokeHime

通过CrawlSpider对招聘网站进行整站爬取

1.数据库的设计


image.png

2.生成Crawl模板的spider
scrapy为我们提供了生成spider的不同模板

(Spider-0m_XmmLx) D:\Spider>scrapy genspider --list
Available templates:
  basic  #默认
  crawl
  csvfeed
  xmlfeed

创建项目以及新建crawl spider

scrapy genspider -t crawl lagouspider www.lagou.com

3.spider创建完成后会在spider文件夹下多一个lagouspider.py。编写自己的爬虫。

# lagouspider.py
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from lagou.items import LagouItem,LagouItemLoader
from lagou.util.common import get_md5
from datetime import datetime

class LagouspiderSpider(CrawlSpider):
    name = 'lagouspider'
    allowed_domains = ['www.lagou.com']
    start_urls = ['https://www.lagou.com/jobs/1198581.html']

    rules = (
        # # 设置爬取一般招聘页面url的估值
        # Rule(LinkExtractor(allow=("zhaopin/.*",)),follow=True),
        # # 设置爬取公司页面url的规则
        # Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
        # # 设置爬取职位详情页面url的规则
        Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True),
    )

    def parse_job(self, response):
        # 解析拉勾网的职位
        item_loader = LagouItemLoader(item=LagouItem(),response=response)
        item_loader.add_css('title',".job-name::attr(title)")
        item_loader.add_value('url',response.url)
        item_loader.add_value('url_object_id',get_md5(response.url))
        item_loader.add_css('salary',".job_request .salary::text")
        item_loader.add_xpath('job_city',"//*[@class='job_request']/p/span[2]/text()")
        item_loader.add_xpath('work_years',"//*[@class='job_request']/p/span[3]/text()")
        item_loader.add_xpath('degree_need', "//*[@class='job_request']/p/span[4]/text()")
        item_loader.add_xpath('job_type', "//*[@class='job_request']/p/span[5]/text()")
        item_loader.add_css("tag",'.position-label li::text')
        item_loader.add_css('publish_time','.publish_time::text')
        item_loader.add_css('job_advantage', '.job-advantage p::text')
        item_loader.add_css('job_desc','.job_bt div')
        item_loader.add_css('job_addr','.work_addr')
        item_loader.add_css('company_name',"#job_company dt a img::attr(alt)")
        item_loader.add_css('company_url', "#job_company dt a::attr(href)")
        item_loader.add_value('crawl_time',datetime.now())
        job_item = item_loader.load_item()
        print(job_item)
        return job_item

4.定义需要保存的数据items.py

import scrapy
from scrapy.contrib.loader import ItemLoader
from scrapy.loader.processors import MapCompose, TakeFirst, Join
from lagou.util.common import extract_num
from w3lib.html import remove_tags

class LagouItemLoader(ItemLoader):
    # 取列表中的第一个
    default_output_processor = TakeFirst()

def replace_splash(value):
    return value.replace("/", "")

def handle_strip(value):
    return value.strip()

def handle_jobaddr(value):
    addr_list = value.split("\n")
    addr_list = [item.strip() for item in addr_list if item.strip() != "查看地图"]
    return "".join(addr_list)

class LagouItem(scrapy.Item):
    # 拉勾网职位
    title = scrapy.Field()
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    salary = scrapy.Field()
    job_city = scrapy.Field(
        input_processor=MapCompose(replace_splash),
    )
    work_years = scrapy.Field(
        input_processor=MapCompose(replace_splash),
    )
    degree_need = scrapy.Field(
        input_processor=MapCompose(replace_splash),
    )
    job_type = scrapy.Field()
    publish_time = scrapy.Field()
    tag = scrapy.Field()
    job_advantage = scrapy.Field()
    job_desc = scrapy.Field(
        input_processor=MapCompose(handle_strip),
    )
    job_addr = scrapy.Field(
        input_processor=MapCompose(remove_tags, handle_jobaddr),
    )
    company_name = scrapy.Field(
        input_processor=MapCompose(handle_strip),
    )
    company_url = scrapy.Field()
    crawl_time = scrapy.Field()
    crawl_update_time = scrapy.Field()

    def get_insert_sql(self):
        insert_sql = """
                insert into lagou(title, url, url_object_id, salary, job_city, work_years, degree_need, tag,
                job_type, publish_time, job_advantage, job_desc, job_addr, company_url, company_name,crawl_time)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE job_desc=VALUES(job_desc)
            """

        job_id = extract_num(self["url"])
        params = (self["title"], self["url"], self["url_object_id"], self["salary"], self["job_city"], self["work_years"], self["degree_need"],self["tag"],
                  self["job_type"], self["publish_time"], self["job_advantage"], self["job_desc"], self["job_addr"],
                  self["company_url"],
                  self["company_name"],self["crawl_time"])

        return insert_sql, params
  1. 定义pipeline,将数据保存到数据库中。别忘了在setting中配置数据库信息以及开启pipeline
# pipeline.py
from twisted.enterprise import adbapi
import pymysql
from pymysql import cursors
class MysqlTwistedPipeline(object):

    # 会自动调用这个函数
    @classmethod
    def from_settings(cls,settings):
        dbparams = dict(host = settings['MYSQL_HOST'],
        db = settings['MYSQL_DBNAME'],
        user = settings['MYSQL_USER'],
        passwd = settings['MYSQL_PASSWORD'],
        charset = 'utf8',
        cursorclass = pymysql.cursors.DictCursor,
        use_unicode = True)

        dbpool = adbapi.ConnectionPool('pymysql',**dbparams)

        return cls(dbpool)

    def __init__(self,dbpool):
        self.dbpool = dbpool

    def process_item(self,item,spider):
        #使用twisted将mysql插入变为异步执行
        query = self.dbpool.runInteraction(self.do_insert,item)
        query.addErrback(self.handle_error, item, spider)

    def handle_error(self,failurer, item, spider):
        # 处理异步插入的异常
        print(failurer)

    def do_insert(self, cursor, item):
        # 执行具体的插入
        # 根据不同的item 构建不同的sql语句并插入到mysql中
        insert_sql, params = item.get_insert_sql()
        print('xxx',insert_sql, params)
        cursor.execute(insert_sql, params)
上一篇下一篇

猜你喜欢

热点阅读