scrapy 自动配置sql入库

2021-09-03  本文已影响0人  周周周__

scrapy 管道自动生成sql

需要在setting中配置item 和 表名称的对应字典 TABLE_NAME_DICT

item.py中配置的字段需要和数据库中的一致

test_piplines.py
from twisted.enterprise import adbapi
import re, pymysql, time
from pymysql.err import IntegrityError


class MysqlSpiderBasePipeline:
    def __init__(self, dbpool, table_name_dict, db_parmars):
        self.dbpool = dbpool
        self.table_name_dict = table_name_dict
        self.db_parmars = db_parmars

    @classmethod
    def from_crawler(cls, cralwer):
        '''
        TABLE_NAME_DICT : {item_name : table_name} item_name: item设置的类名称,  table_name: 表名称
        '''
        db_parmars = {  # 数据库链接基本设置
            'host': cralwer.settings['MYSQL_HOST'],
            'user': cralwer.settings['MYSQL_USER'],
            'passwd': cralwer.settings['MYSQL_PWD'],
            'db': cralwer.settings['MYSQL_DB'],
            'port': cralwer.settings['MYSQL_PORT'],
            'charset': cralwer.settings['MYSQL_CHARSET'],
            "cp_min": 10,
            "cp_max": 15,
            "cp_reconnect": True,
        }
        dbpool = adbapi.ConnectionPool('pymysql', **db_parmars)
        table_name_dict = cralwer.settings['TABLE_NAME_DICT']  # 表对应的字典
        assert table_name_dict, 'if you want use the MyTestPipeline, please add TABLE_NAME_DICT in custom_settings'
        return cls(dbpool, table_name_dict, db_parmars)

    def process_item(self, item, spider):
        # 入库
        query = self.dbpool.runInteraction(
            self.insert_data_to_mysql,
            item, spider
        )
        query.addErrback(
            self.insert_err,
            item, spider
        )
        return item

    def insert_err(self, failure, item, spider):
        print(failure, '失败')  # , item)

    def insert_data_to_mysql(self, cursor, item, spider):
        pass

    def sql_from_dict(self, ema_item, spider):
        '''
        :param ema_item:  item ,  table_name_dict: setting中设置的 item名和表名对应字典
        :return:
        '''
        try:
            item_name = re.findall('\.([A-Z]\w*)', str(type(ema_item)))  # 表名key
            table_name = self.table_name_dict[item_name[0]]  # 从配置的dict 对应的item 取出表名
            table_keys = list(ema_item.keys())
            sql = f'insert into {table_name} {tuple(table_keys)} values ({"%s," * (len(table_keys) - 1) + "%s"})'.replace(
                "'", "")
            vs = [ema_item[i] for i in table_keys]  # 根据key值列表生成值的元组
            vs_list = []
            for i in vs:
                i = '' if not i else i
                vs_list.append(i)
            vs = tuple(vs_list)
            return sql, vs
        except Exception as e:
            spider.logger.warning(f'sql_from_dict error: {e}')


class MyTestPipeline(MysqlSpiderBasePipeline):
    def insert_data_to_mysql(self, cursor, ema_item, spider):
        try:
            sql, vs = self.sql_from_dict(ema_item, spider)  # 生成sql,和key值列表
            spider.logger.debug(f"sql: {sql}, {vs}")
            cursor.execute(sql, vs)
        # except IntegrityError as e:  # 判断如果是唯一异常:pass
        #     if 'Duplicate entry' in e.args[1]:
        #         pass
        except Exception as e:
            spider.logger.info(f'MyTestPipeline error: {e} , sql: {sql},data: {vs}')

spider.py
custom_settings = {
        "ITEM_PIPELINES": {
            "spider_pipelines.test_piplines.MyTestPipeline": 300,
        },
        "TABLE_NAME_DICT": {
            "TestItem": "test_table_name",
        }
}
Item.py
class TestItem(scrapy.Item): # 类命名必须是全英文
    ''' 下边和字段名一致 '''
    test1 = scrapy.Field()
    test2 = scrapy.Field()

上一篇下一篇

猜你喜欢

热点阅读