scrapy异步使用Django 模型存储

2021-11-22  本文已影响0人  mutang
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
import asyncio

from product.items import CategoryItem, SubCategoryItem, SKUItem
from control.models import Category, SubCategory, SKU
from concurrent.futures import ThreadPoolExecutor


class ProductPipeline:
    # 使用异步存储的时候,使用sqlite3会报错,因为sqlite3是单线程的,我们是一个线程池对象,并发存储会被sqlite3拒绝(database was locked)
    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    # 创建线程池
    executor = ThreadPoolExecutor()
    # 任务队列
    tasks = []
    counter = {'cate': 0, 'sub_cate': 0, 'sku': 0}

    async def process_item(self, item, spider):
        # 存在则更新 get_update
        print(item)
        if isinstance(item, CategoryItem):
            return self.process_category_item(item, spider)
        elif isinstance(item, SubCategoryItem):
            return self.process_sub_category_item(item, spider)
        else:
            return self.process_sku_item(item, spider)

    def process_category_item(self, item, spider):
        '''将保存数据的处理方法加入到任务队列'''
        self.counter['cate'] += 1
        task = self.loop.run_in_executor(self.executor, self.executor_func(Category, item), )
        self.tasks.append(task)
        return item

    def process_sub_category_item(self, item, spider):
        '''将保存数据的处理方法加入到任务队列'''
        self.counter['sub_cate'] += 1
        task = self.loop.run_in_executor(self.executor, self.executor_func(SubCategory, item), )
        self.tasks.append(task)
        return item

    def process_sku_item(self, item, spider):
        '''将保存数据的处理方法加入到任务队列'''
        self.counter['sku'] += 1
        task = self.loop.run_in_executor(self.executor, self.executor_func(SKU, item), )
        self.tasks.append(task)
        return item

    @staticmethod
    def executor_func(model, item):
        '''主要作用是将有参数的函数转换为无参数的函数返回,方便run_in_executor方法调用,这个方法它只接受位置传参,不接受关键字传参'''

        def func():
            if isinstance(item, CategoryItem):
                return model.objects.get_or_create(defaults=item['familyID'], **item)  # 一般是create
            elif isinstance(item, SubCategoryItem):
                d = dict(item)
                try:
                    category = Category.objects.get(familyID=d.pop('category'))
                except Category.DoesNotExist:
                    print('未添加进去 -----', d)
                else:
                    return model.objects.get_or_create(defaults=d['sub_cate_id'], category=category, **d)
            else:
                d = dict(item)
                try:
                    sub_cate = SubCategory.objects.get(sub_cate=d.pop('sub_cate'))
                except SubCategory.DoesNotExist:
                    print('未添加进去 -----', d)
                else:
                    return model.objects.get_or_create(defaults=d['sku'], sub_cate=sub_cate, **d)

        return func

    def close_spider(self, spider):
        '''当爬虫关闭的时候调用这个方法保存数据'''
        print(self.counter)
        self.loop.run_until_complete(asyncio.wait(self.tasks))

以上代码有问题,下级模型往往找不到上级,暂时贴出来

正确的,虽然不够优雅,但能实现,能保存。不需要scrapy中item,在spider.py中yield 字典

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from asgiref.sync import sync_to_async #同步变异步

from control.models import Category, SubCategory, SKU


class ProductPipeline:
    counter = {
        'cate': 0,
        'sub_cate': 0,
        'sku': 0
    }
    @sync_to_async
    def process_item(self, item, spider):
        print(item)
        try:
            cate = Category.objects.get(familyID=item['familyID'])
        except Category.DoesNotExist:
            cate = Category()
            cate.familyID = item['familyID']
            cate.familyName = item['familyName']
            cate.metricsFamilyName = item['metricsFamilyName']
            cate.url = item['url']
            cate.level = item['level']
            cate.isLeaf = item['isLeaf']
            cate.count = item['count']
            cate.save()
            self.counter['cate'] += 1
        try:
            sub_category = SubCategory.objects.get(sub_cate_id=item['sub_cate_id'])
        except SubCategory.DoesNotExist:
            sub_category = SubCategory()
            sub_category.category = cate
            sub_category.sub_cate_id = item['sub_cate_id']
            sub_category.sub_cate_name = item['sub_cate_name']
            sub_category.save()
            self.counter['sub_cate'] += 1
        # 保存
        sku = SKU(sub_cate=sub_category, sku=item['sku'])
        sku.save()
        self.counter['sku'] += 1
        print(self.counter)
        return item

上一篇下一篇

猜你喜欢

热点阅读