用ChatGpt 实现一次简单的 python 新手爬虫

2023-11-07  本文已影响0人  cgrass

前言

1.适合有一定程序思维的速成者。
2.适合有其他语言基础的速成者。

爬虫流程

1.明确需求
就是你要从哪个网址拿到什么信息,本次我们需要yelp 的二级网址拿到商户的商户名,电话,地址,访问链接。
2.分析网址
查找你所需要的内容的相关熟悉。
3.开发调试
vscode 或者 pycharm
3.输出文档
输出csv文档

开始搞活

1.先看下需求
需要在yelp网址上查找固定的几个美国州 ['FL','GA','IL','IN','MD','MA','MI', 'NJ','NY','NC','OH','PA','SC','TN','TX','VA']内 "All You Can Eat" l类型的店铺信息.
如下图所示,具体链接为https://www.yelp.com/search?find_desc=All+You+Can+Eat&find_loc=FL

image.png

可以看到总21页,每页10条,除了正常要获取的10条商户信息外还有,数量不固定的广告位。为了好爬取信息,我这边直接忽略广告位的信息。直接爬取每页最多10条的商户信息。

2.点击第二页查看下不同页数的 访问规律:
https://www.yelp.com/search?find_desc=All+You+Can+Eat&find_loc=FL&start=10
每页start+10

3.接着查看 我们需要的数据在哪个位置
打开浏览器的F12 进入调试模式,建议google浏览器


image.png

通过初步分析发现我们要的数据在这边,看过去class name都是固定的,也有我们要的列表点击进去的二级url。


image.png
或者通过这种方式查找
image.png
通过观察发现数据在<Script>标签下面
image.png

下面这个是列表的名称和地址


image.png

本文通过的是爬取Script标签数据,来获取二级网址。

4.获取到二级网址后,我们继续分析二级网址内容
如下图所示,电话,地址都有了。


image.png

通过f12 看看这些东西在哪里


image.png

看上图就可以直接看出name,telephone,address

5.东西都查找完了,这个时候,如何提取,如何编写代码呢,对于不熟悉python的人来说,chatGpt就可以派上用场了,哪里报错了,就问chatGpt,边问边搞。
比如:
如何获取scrpt 标签中的内容


image.png

如何爬取script标签中的指定内容(正则)


image.png
只要你懂的问,就有结果。

接下来我们用chatGpt来边问边写代码.
先看看代码主体流程

# 程序结构
class xxxSpider(object):
    def __init__(self):
        # 定义常用变量,比如url或计数变量等
       
    def get_html(self):
        # 获取响应内容函数,使用随机User-Agent
   
    def parse_html(self):
        # 使用正则表达式来解析页面,提取数据
   
    def write_html(self):
        # 将提取的数据按要求保存,csv、MySQL数据库等
       
    def run(self):
        # 主函数,用来控制整体逻辑
       
if __name__ == '__main__':
    # 程序开始运行时间
    spider = xxxSpider()
    spider.run()

基本上的爬虫流程就是这样的,定义url,获取url的内容,解析url,把获取到的东西写到文件中存储。

具体的代码如下:
csv 文件工具 csvUtil.py

import csv
def writeCsv(name,urls):
    with open(name+'.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        #构建字段名称,也就是key
        # 写入表头
        writer.writerow(['url'])
        # 写入每个 URL
        for url in urls:
            writer.writerow([url])
def initShopCsv(name):
    with open(name+'.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        #构建字段名称,也就是key
        # 写入表头
        writer.writerow(['ShopName',"Phone","Address","Url"])
        
def writeShopCsv(data,name):
    with open(name+'.csv', 'a', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        # 写入每个
        writer.writerow(data)
        print("写入数据:"+str(data))

常量类address_info.py

states=['FL','GA','IL','IN','MD','MA','MI',
                  'NJ','NY','NC','OH','PA','SC','TN','TX','VA']

主体代码 yelp.py

from urllib import parse
import time
import random
from fake_useragent import UserAgent
import requests
from lxml import etree
from bs4 import BeautifulSoup
import re
import json
import csvUtil
from static.address_info import states
#忽略ssl报错
requests.packages.urllib3.disable_warnings()
import concurrent.futures

class openTableSpider(object):
    #1.初始化,设置基地址
    def __init__(self):
        self.url='https://www.yelp.com/search?{}'
     
    #2.搜索店面,加入头部信息,查询时间理应大于当前时间,否则容易被发现不是人工点击
    #查找到收寻的店明的列表信息
    def searchShopHtml(self,url,state,size):
        #参数信息
        params={
            'find_desc':"All You Can Eat",
            'find_loc':state,
            'start':size
        }
        #格式化信息
        full_url=url.format(parse.urlencode(params))
        print(""+full_url)
        ua=UserAgent()
        # req=requests.Request(url=full_url,)
        respone=requests.get(url=full_url,headers={'User-Agent':'AdsBot-Google'},verify=False)
       # 使用 BeautifulSoup 解析 HTML
        soup = BeautifulSoup(respone.text, 'html.parser')
        # 查找所有的 <script> 标签
        script_tags = soup.find_all('script')
        # 输出每个 <script> 标签的内容
        for script in script_tags:
            script_str=script.string
            # 关键字   businessUrl /biz 开头
            if script_str and 'businessUrl' in script_str:
                # print("开始:"+script_str)
                #查找到相应的访问url
                link_match=re.findall(r'"businessUrl":"([^"]+)"',script_str)
                if link_match:
                    format_urls=[x for x in link_match if "ad_business_id" not in x]
                    # 去重
                    urls = list(set(format_urls))
                    # print(len(urls))
                    print(str(urls))
                    # csvUtil.writeCsv(shopName,link_match)
                    return urls
                   
       
     # 3.解析查找到的每家店的url,并且获取到店面和电话号码,以及地址。
    def parse_Url(self,url,state):
        url="https://www.yelp.com/"+url
     
        print("Start doing "+url)
        ua=UserAgent()
        respone=requests.get(url=url,headers={'User-Agent':ua.random},verify=False)
        # 使用 BeautifulSoup 解析 HTML
        soup = BeautifulSoup(respone.text, 'html.parser')
        # 查找所有的 <script> 标签
        script_tags = soup.find_all('script')
        # 输出每个 <script> 标签的内容
        # 找到name,phone,address 数据的位置,发现在 script标签底下
        # name window.__INITIAL_STATE__   restaurantProfile  restaurant  name   address  
        #  phone=  contactInformation   formattedPhoneNumber
        #  address address":{Phone number
        for script in script_tags:
            script_str=script.string
            address=name=phone=""
            if script_str and 'telephone' in script_str:
                # print(script_str)
                m_address = re.search( r'"address":{(.*?)},', script_str)
                if m_address:
                    # {"line1":"5115 Wilshire Blvd","line2":"","state":"CA","city":"Los Angeles","postCode":"90036","country":"United States","__typename":"Address"},
                    temp=m_address.group(1)
                    # print(temp)
                    try:
                        streetAddress = re.search(r'"streetAddress":"([^"]*)"', temp).group(1)
                        addressLocality = re.search( r'"addressLocality":"([^"]*)"', temp).group(1)
                        addressCountry = re.search( r'"addressCountry":"([^"]*)"', temp).group(1)
                        addressRegion = re.search( r'"addressRegion":"([^"]*)"', temp).group(1)
                        postalCode = re.search(r'"postalCode":"([^"]*)"', temp).group(1)
                        address=streetAddress.replace("\\n","")+","+addressLocality+","+addressRegion+","+postalCode
                        print(address)
                        # flag=any(item.upper() in state.upper() for item in openTableAddress)
                        # # 如果不在地址池中的店,则不记录下来
                        # if flag is False:
                        #     return
                    except  Exception as e:
                        address=streetAddress+","+addressLocality+","+addressRegion+","+postalCode
                        print("地址解析异常:", e)
                    
                m_name = re.search( r'"name":"(.*?)",', script_str)
                if m_name:
                    name=m_name.group(1)
                    # print(name)
                m_phone = re.search( r'telephone":"(.*?)",', script_str)
                if m_phone:
                    phone=m_phone.group(1)
                    # print(phone)
                data=[name,phone,address,url] 
                csvUtil.writeShopCsv(data=data,name=state)                   
        
    # 4.一个一个流程
    def doTransaction(self,state,page):
        urls=self.searchShopHtml(self.url,state,page)
        if not urls:
            return
        print(len(urls))
        while len(urls)%10==0:
            print(len(urls))
            new_urls=[]
            reptry=0
            page+=10
            reptry=0
            #重试10次
            while(reptry<10):
                try:
                    new_urls = self.searchShopHtml(spider.url,state, page)
                    break
                except  Exception as e:
                        print("发生了其他异常:", e)
                        # 重试一次
                        reptry+=1
                        time.sleep(random.randint(5,10))
             # 如果没有新的URLs,即查询结果为空,终止循环
            if not new_urls:
                break
            urls += new_urls
            time.sleep(random.randint(5,10))                
            
        count=0     
        #重试10次
        for url in urls:
            reptry=0
            while(reptry<10):
                try:
                    self.parse_Url(url,state) 
                    count+=1
                    print(state+" deal count "+str(count))
                    break
                except  Exception as e:
                    print("发生了其他异常:", e)
                    reptry+=1
                finally:
                    time.sleep(random.randint(3,10))     
    
    #5.入口函数
    def run(self):
        
        for state in states:
              csvUtil.initShopCsv(state)
        # 创建线程池并并发执行任务
        with concurrent.futures.ThreadPoolExecutor() as executor:
        # 使用executor.map并发执行任务,传递参数列表
            executor.map(self.doTransaction, states, [0] * len(states))
        print("All tasks have been submitted.")
        # # 单线程处理
        # threads = []
        # for state in states:
        #      print(state)
        #      csvUtil.initShopCsv(state)
        #      thread = threading.Thread(target=)
        #      thread.start()
        #      threads.append(thread)
       
        # # 等待所有线程完成
        # for thread in threads:
        #     thread.join()
      
if __name__=='__main__':
    start=time.time()
    spider=openTableSpider() #实例化一个对象spider
    # spider.searchShopHtml(spider.url,"New York",0) #调用入口函数
    # spider.parse_Url("biz/zen-korean-bbq-duluth?osq=All+You+Can+Eat+Buffet","GEORGIA") 
    spider.run()
    end=time.time()
    #查看程序执行时间
    print('执行时间:%.2f'%(end-start)) 

具体内容查看注释,有不懂操作的直接问gpt

上一篇下一篇

猜你喜欢

热点阅读