scrapy 自动配置sql入库
2021-09-03 本文已影响0人
周周周__
scrapy 管道自动生成sql
需要在setting中配置item 和 表名称的对应字典 TABLE_NAME_DICT
item.py中配置的字段需要和数据库中的一致
test_piplines.py
from twisted.enterprise import adbapi
import re, pymysql, time
from pymysql.err import IntegrityError
class MysqlSpiderBasePipeline:
def __init__(self, dbpool, table_name_dict, db_parmars):
self.dbpool = dbpool
self.table_name_dict = table_name_dict
self.db_parmars = db_parmars
@classmethod
def from_crawler(cls, cralwer):
'''
TABLE_NAME_DICT : {item_name : table_name} item_name: item设置的类名称, table_name: 表名称
'''
db_parmars = { # 数据库链接基本设置
'host': cralwer.settings['MYSQL_HOST'],
'user': cralwer.settings['MYSQL_USER'],
'passwd': cralwer.settings['MYSQL_PWD'],
'db': cralwer.settings['MYSQL_DB'],
'port': cralwer.settings['MYSQL_PORT'],
'charset': cralwer.settings['MYSQL_CHARSET'],
"cp_min": 10,
"cp_max": 15,
"cp_reconnect": True,
}
dbpool = adbapi.ConnectionPool('pymysql', **db_parmars)
table_name_dict = cralwer.settings['TABLE_NAME_DICT'] # 表对应的字典
assert table_name_dict, 'if you want use the MyTestPipeline, please add TABLE_NAME_DICT in custom_settings'
return cls(dbpool, table_name_dict, db_parmars)
def process_item(self, item, spider):
# 入库
query = self.dbpool.runInteraction(
self.insert_data_to_mysql,
item, spider
)
query.addErrback(
self.insert_err,
item, spider
)
return item
def insert_err(self, failure, item, spider):
print(failure, '失败') # , item)
def insert_data_to_mysql(self, cursor, item, spider):
pass
def sql_from_dict(self, ema_item, spider):
'''
:param ema_item: item , table_name_dict: setting中设置的 item名和表名对应字典
:return:
'''
try:
item_name = re.findall('\.([A-Z]\w*)', str(type(ema_item))) # 表名key
table_name = self.table_name_dict[item_name[0]] # 从配置的dict 对应的item 取出表名
table_keys = list(ema_item.keys())
sql = f'insert into {table_name} {tuple(table_keys)} values ({"%s," * (len(table_keys) - 1) + "%s"})'.replace(
"'", "")
vs = [ema_item[i] for i in table_keys] # 根据key值列表生成值的元组
vs_list = []
for i in vs:
i = '' if not i else i
vs_list.append(i)
vs = tuple(vs_list)
return sql, vs
except Exception as e:
spider.logger.warning(f'sql_from_dict error: {e}')
class MyTestPipeline(MysqlSpiderBasePipeline):
def insert_data_to_mysql(self, cursor, ema_item, spider):
try:
sql, vs = self.sql_from_dict(ema_item, spider) # 生成sql,和key值列表
spider.logger.debug(f"sql: {sql}, {vs}")
cursor.execute(sql, vs)
# except IntegrityError as e: # 判断如果是唯一异常:pass
# if 'Duplicate entry' in e.args[1]:
# pass
except Exception as e:
spider.logger.info(f'MyTestPipeline error: {e} , sql: {sql},data: {vs}')
spider.py
custom_settings = {
"ITEM_PIPELINES": {
"spider_pipelines.test_piplines.MyTestPipeline": 300,
},
"TABLE_NAME_DICT": {
"TestItem": "test_table_name",
}
}
Item.py
class TestItem(scrapy.Item): # 类命名必须是全英文
''' 下边和字段名一致 '''
test1 = scrapy.Field()
test2 = scrapy.Field()