8.mysql之异步操作

2018-06-14  本文已影响0人  MononokeHime

mysql之异步操作

普通的mysql操作是同步操作,插入数据的速度(即I/O读写)远远低于spider中解析数据的速度。所以我们需要将mysql插入数据进行异步化,无需等待mysql的I/O读写,而将CPU的控制权交给其他操作。

我们可以将mysql的配置写在pipeline的__init__中,也可以写在settings.py里

#setting.py
MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'lagou'
MYSQL_USER = 'root'
MYSQ_PASSWARD = '123456'

我们借助twist异步框架来实现数据库的异步操作

runInteraction(function)会将function函数变为异步操作

from twisted.enterprise import adbapi
import pymysql
from pymysql import cursors
class MysqlTwistedPipeline(object):

    # 会自动调用这个函数
    @classmethod
    def from_settings(cls,settings):
        dbparams = dict(host = settings['MYSQL_HOST'],
        db = settings['MYSQL_DBNAME'],
        user = settings['MYSQL_USER'],
        passwd = settings['MYSQL_PASSWORD'],
        charset = 'utf8',
        cursorclass = pymysql.cursors.DictCursor,
        use_unicode = True)

        dbpool = adbapi.ConnectionPool('pymysql',**dbparams)

        return cls(dbpool)

    def __init__(self,dbpool):
        self.dbpool = dbpool

    def process_item(self,item,spider):
        #使用twisted将mysql插入变为异步执行
        query = self.dbpool.runInteraction(self.do_insert,item)
        query.addErrback(self.handle_error, item, spider)

    def handle_error(self,failurer, item, spider):
        # 处理异步插入的异常
        print(failurer)

    def do_insert(self, cursor, item):
        # 执行具体的插入
        # 根据不同的item 构建不同的sql语句并插入到mysql中
        insert_sql, params = item.get_insert_sql()
        print(insert_sql, params)
        cursor.execute(insert_sql, params)

在items.py中定义sql语句,例如:

class LagouItem(scrapy.Item):
    title = scrapy.Field()
    ......

    def get_insert_sql(self):
        insert_sql = """
                insert into lagou_job(title, url, salary, job_city, work_years, degree_need,
                job_type, publish_time, job_advantage, job_desc, job_addr, company_url, company_name, job_id)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE job_desc=VALUES(job_desc)
            """

        job_id = extract_num(self["url"])
        params = (self["title"], self["url"], self["salary"], self["job_city"], self["work_years"], self["degree_need"],
                  self["job_type"], self["publish_time"], self["job_advantage"], self["job_desc"], self["job_addr"],
                  self["company_url"],
                  self["company_name"], job_id)

        return insert_sql, params
上一篇 下一篇

猜你喜欢

热点阅读