python的爬虫项目(链家买二手房)

2020-05-02  本文已影响0人  thirsd

不知不觉,已经工作6年有余,恍恍惚惚,有机会满足房子需求。
在收集房子信息过程中,做些记录。

贝壳的功能很强大,但很难满足小区、距离、教育、面积等多个方面的匹配,使用起来成本仍然较高。

针对以上情况,编写该项目,收集链家的二手房数据。项目中,主要根据价格来筛选小区,并根据小区教育、同工作位置的距离来确定关注小区,再通过房子面积、总价、户型来确定可以选择的房子 列表,从而将购房精力集中在关注的重点小区和房子中。

当然,每个人可以根据自己需求进行调整。

一、基础环境说明

1.1 基础环境

1.1.1 python

官网:https://www.python.org/

官方文档:https://www.python.org/doc/

1.1.2 request(加载页面)

官方文档:https://cn.python-requests.org/zh_CN/latest/

1.1.3 BeautifuSoup(提取信息 )

官方文档:https://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html

常用使用例子:

from bs4 import BeautifulSoup
soup = BeautifulSoup(a, "html.parser")
soup.title.text # '标题'

# 一、提取标签
# 1.1 提取唯一标签
soup.h1
soup.find('h1')
soup.find_all('h1')[0]
# 1.2 提取多个标签
soup.find_all('h2')
# [<h2>标题2</h2>, <h2>标题3</h2>]
soup.find_all(['h1','h2'])
# [<h1>标题1</h1>, <h2>标题2</h2>, <h2>标题3</h2>]
# 1.3 使用正则表达式
import re
soup.find_all(re.compile('^h'))
# [<h1>标题1</h1>, <h2>标题2</h2>, <h2>标题3</h2>]

# 二、匹配属性
# 2.1 匹配属性1,直接将属性名作为参数名,但是有些属性不行,比如像a-b这样的属性
soup.find_all('p', id = 'p1') # 一般情况
soup.find_all('p', class_='p3') # class是保留字比较特殊,需要后面加一个_
# 2.2 匹配属性2,最通用的方法
soup.find_all('p', attrs={'class':'p3'}) # 包含这个属性就算,而不是只有这个属性
soup.find_all('p', attrs={'class':'p3','id':'pp'}) # 使用多个属性匹配
soup.find_all('p', attrs={'class':'p3','id':False}) # 指定不能有某个属性
soup.find_all('p', attrs={'id':['p1','p2']}) # 属性值是p1或p2
soup.find_all('p', attrs={'class':True}) # 含有class属性即可
# 2.3 匹配属性3,正则表达式匹配
import re
soup.find_all('p', attrs={'id':re.compile('^p')}) # 使用正则表达式


# 三、根据标签内容文本来识别
# 3.1 匹配标签内容1,正则表达式
import re
soup.find_all('p', text=re.compile('段落'))
soup.find_all('p',text=True)
# 3.2 匹配标签内容2,传入函数
def nothing(c):
    return c not in ['段落1','段落2','文章']
soup.find_all('p',text=nothing)

def has_class_but_no_id(tag):
    return tag.has_attr('class') and not tag.has_attr('id')

# 四、提取内容
# 4.1 提取标签文本
soup.h.text # 多层嵌套也可以直接返回
soup.h.a.text # 也可以这样
soup.body.text # 里面有多个内容时 '\n标题\n段落1\n段落2\n'
# 4.2 其他标签的属性值
# 提取属性值,像字典一样提取,以下两种方法等价
soup.h.a['href']
soup.h.a.get('href')

# 五、提取标签信息
print(i.name) # 提取标签名
print(i.attrs) # 提取标签所有属性值
print(i.has_attr('href')) # 检查标签是否有某属性

# 五、示例
soup.find('p', attrs={'class':'first'}).text # '文字1'
soup.find_all('p') # [<p class="first">文字1</p>, <p class="second">文字2</p>], 再分别从中提取文字
soup.find('ul', attrs={'class':'list1'}).find_all('li') # [<li>列表1第1项</li>, <li>列表1第2项</li>]


# 代码参考:https://zhuanlan.zhihu.com/p/35354532

1.1.4 地理位置信息(百度API)

官方文档:http://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding

调用方式1:

def geocodeB(address):
    base = "http://api.map.baidu.com/geocoder?address=%s&output=json&key=yourak&city=上海" % address
    response = requests.get(base)
    if response.status_code == 200:
        answer = response.json()
        if "location" in answer['result'] and "level" in answer['result']:
            return (address,
                    # round(answer['result']['location']['lng'], 5),
                    answer['result']['location']['lng'],
                    # round(answer['result']['location']['lat'], 5),
                    answer['result']['location']['lat'],
                    answer['result']["level"])
        else:
            logger.error("geocodeB %s warning:%s" % (address, answer))
            return None
    else:
        logger.error("geocodeB %s Error" % address)
        return None

调用方式2:

def geocodeB2(address):
    from urllib.request import urlopen, quote
    from urllib.parse import quote_plus
    import hashlib, json
    # 以get请求为例http://api.map.baidu.com/geocoder/v2/?address=百度大厦&output=json&ak=yourak
    queryStr = '/geocoder/v2/?address=%s&city=上海&output=json&ak=$yourak$' % address

    # 对queryStr进行转码,safe内的保留字符不转换
    encodedStr = quote(queryStr, safe="/:=&?#+!$,;'@()*[]")

    # 在最后直接追加上yoursk
    rawStr = encodedStr + '$yoursn$'
    sn = hashlib.md5(quote_plus(rawStr).encode("utf8")).hexdigest()

    url = 'http://api.map.baidu.com%s&sn=%s' % (encodedStr, sn)
    req = urlopen(url)
    res = req.read().decode()  # 将其他编码的字符串解码成unicode
    answer = json.loads(res)  # 对json数据进行解析
    if "location" in answer['result'] and "level" in answer['result']:
        return answer['result']['location']['lat'], answer['result']['location']['lng']
    else:
        logger.error("geocodeB %s warning:%s" % (address, answer))
        return None

调用方式3:

def geocode_by_baidu(address):
    from geopy.geocoders import baidu
    apikey = '$yourak$'  # 从网站申请 http://lbsyun.baidu.com/apiconsole/key?application=key
    sn = '$yoursn$'
    g = baidu.Baidu(api_key=apikey, security_key=sn, timeout=200)
    a = g.geocode(address)
    # return (round(a.latitude, 6), round(a.longitude, 6))
    return a.latitude, a.longitude

1.1.5 地理获取距离计算(geopy)

# x and y is (lat,lng)
def get_distance(x, y):
    from geopy.distance import geodesic
    return round(geodesic(x, y).km, 3)

1.1.6 解决懒加载和滚动加载(selenium)

Selenium是一个用于Web应用程序测试的工具。

Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE(7, 8, 9, 10, 11),Firefox,Safari,Chrome,Opera等。

使用python爬虫调用selenium来模拟正常用户访问浏览器.

1.2 主要问题

1.2.1 懒加载问题

参考:http://www.selenium.org.cn/

1.2.2 滚动加载问题

1.2.3 IP访问限制

参考:Python爬虫 | 代理IP的获取和使用

二、前期准备

2.1 分析获取的需求

个人买房需求:

预算:400万,最多不超过450万;

教育:2梯队学区房

户型:二房以上

房龄:1990年后

面积:60平米以上

交通:离世纪大道乘公交不超过1小时

===》

1、学区情况,根据小区攻略的教育评分来过滤,确定小区范围

2、根据a. 小区的中的房子的价格,使用预算过滤;b.小区的位置,通过距离来过滤交通,不满足 的小区

3、通过符合要求的小区列表,来针对每个小区获取房子列表,并确定跟踪重点小区

特别说明:

1、为什么不直接获取房子呢?房子无法判断是否满足教育;如果通过房子找小区,再找教育,考虑房子比小区多出几个数量级,会有更多的时间浪费

2、通过预算和面积需求,可以确定房子的单价,通过单价来筛选小区,减少小区范围。

2.2 分析页面路径

2.2.1 获取小区列表

1、小区列表的链接分析

由于链家仅显示前100页内容,而整个上海的小区显然比100页更多,故根据区来获取小区。

https://m.ke.com/sh/xiaoqu/hongkou/bp5ep7.5pg%s/

其中

  1. bp5ep7.5为价格在5-7.5万的区间,bp为begin price;ep为end price。
  2. pg为page的页面

2、小区是否有评价的判断

可以根据第一步获取的小区列表中,查看小区是否存在小区攻略标签来判断是否有小区评价信息

特别说明:并不是每一个小区,都可以查看到小区的教育评分

示例链接:https://m.ke.com/sh/xiaoqu/5011000016009/,可以获取到小区的整体的评分

2.2.2 根据小区,获取攻略

小区的攻略地址为:

https://m.ke.com/sh/xiaoqu/5011000016009/gonglueV2.html?click_source=m_resblock_detail#review

对于小区,有总体评分和分项评分,其中分项评分包含建筑品质、户型设计、交通条件、教育质量、商业环境、花园景观、物业管理等评分。

每个人可以根据自己的需求,使用不同的评分项进行小区过滤。

例如,我优先考虑教育,则以教育条件进行主要过滤条件,要求教育8分以上,而其他的要求6.5分以上。

2.2.3 根据小区,获取房子列表

小区的房子列表:

原始路径:https://m.ke.com/sh/ershoufang/c5011000016009/

增加过滤:https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba67ea70c5011000016009
其中,bp350ep450为价格区间为350-450万;l2l3户型为2室或3室;ba67ea70为面积在67-70平米;c5011000016009 为小区编号为5011000016009。

三、项目代码实现

3.1 获取小区

def get_xiaoqu_list(self, area, save_path):
    page_size = 100
    # 由于仅收集上海,故未对多城市处理
    fieldnames = ['area', 'page', 'xiaoqu_id', 'url', 'name', "brief", "loc", "build_type", "build_year", "price",
                  "have_gonglue"]

    # 如果不存在,则创建一个空CSV文件,包含表头
    # 如果已存在,则将记录已处理的记录情况(针对IP限制,需要跑多次情况)
    handled_list = []
    if os.path.isfile(save_path):
        with open(save_path, encoding='utf-8-sig') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                handled_list.append("%s_%s" % (row['area'], row['page']))
    else:
        with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

    handled_set = set(handled_list)
    logger.info(
        "get_xiaoqu_list, have handled:%s " % (len(handled_set)))

    # 针对上海各区进行处理
    with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        for page_num in range(1, page_size):
            # https://m.ke.com/sh/xiaoqu/pudong/pb4ep4.5pg10/
            url = "https://m.ke.com/sh/xiaoqu/%s/bp5ep7.5pg%s/" % (area, str(page_num))

            if "%s_%s" % (area, page_num) in handled_set:
                logger.info("%s has been handled." % url)
                continue
            else:
                logger.info(url)

            # 获取页面内容
            r = requests.get(url=url, headers=self.page_headers)
            html = r.content
            lj = BeautifulSoup(html, 'html.parser')
            page_items = lj.find_all('li', attrs={'class': 'pictext'})

            # 解析各页中的小区列表
            if len(page_items) > 0:
                for item in page_items:
                    xiaoqu_url = item.a.get('href')
                    xiaoqu_id = xiaoqu_url.split("/")[-2]
                    xiaoqu_gonglue = item.find_all("p", attrs={"class": "gonglue_title"})
                    if len(xiaoqu_gonglue) == 0:
                        is_gonglue = 0
                    else:
                        is_gonglue = 1
                    xiaoqu_info = item.find_all("div", attrs={"class": "item_list"})[0]
                    xiaoqu_name = xiaoqu_info.find_all("div", attrs={"class": "item_main"})[0].string
                    xiaoqu_brief = xiaoqu_info.find_all("div", attrs={"class": "item_other"})[0].string.strip(
                        "\n\r \"")
                    xiaoqu_brief = " ".join(xiaoqu_brief.split())
                    xiaoqu_loc = xiaoqu_brief.split()[0]
                    build_type = xiaoqu_brief.split()[1]
                    build_year = re.search(r' (?P<build_year>\d{1,})年建成', xiaoqu_brief, re.I)
                    if build_year:
                        xiaoqu_build = build_year.group("build_year")
                    else:
                        xiaoqu_build = ""
                    xiaoqu_price = xiaoqu_info.find_all("span", attrs={"class": "price_total"})[0].em.string

                    xiaoqu_dict = {
                        "area": area,
                        "page": page_num,
                        "xiaoqu_id": xiaoqu_id,
                        "url": xiaoqu_url,
                        "name": xiaoqu_name,
                        "brief": xiaoqu_brief,
                        "loc": xiaoqu_loc,
                        "build_type": build_type,
                        "build_year": xiaoqu_build,
                        "price": xiaoqu_price,
                        "have_gonglue": is_gonglue
                    }
                    writer.writerow(xiaoqu_dict)

            else:
                # 表面已到最后一页
                break
            handled_set.update({"%s_%s" % (area, page_num)})

3.2 根据小区列表,获取包含攻略的小区

3.2.1 根据单个页面获取小区详细信息

# 根据指定小区的id,获取小区的攻略信息
def get_xiaoqu_gonglue_dict(self, id):
    url = "https://m.ke.com/sh/xiaoqu/%s/gonglueV2.html?click_source=m_resblock_detail#review" % id
    logger.info(url)

    # 根据url加载页面
    # https://m.ke.com/sh/xiaoqu/5011000007603/gonglueV2.html?click_source=m_resblock_detail#review
    html = requests.get(url=url, headers=self.page_headers).content
    lj = BeautifulSoup(html, 'html.parser')
    loc_node = lj.find('div', attrs={'class': 'head_location'})
    if loc_node is not None:
        loc_name = loc_node.string
    else:
        loc_name = ""
    cpt_content = lj.find_all('div', attrs={'id': 'review'})[0]

    totoal_score = cpt_content.find('div', attrs={'class': "review_score"}).get_text().replace("综合测评得分", "")
    review_txt = ""
    if cpt_content.find('div', attrs={'class': "review_txt_box"}) is not None:
        review_txt = cpt_content.find('div', attrs={'class': "review_txt_box"}).get_text().strip(" \n\r")

    review_list_txt = cpt_content.find('ul', attrs={'class': "review_list"})
    review_list = review_list_txt.find_all('li')
    other = ""
    jianzhu_score = huxing_score = jiaotong_score = shangye_score = jiaoyu_score = jingguan_score = wuye_score = ""
    for item in review_list:
        key = item.span.string
        value = item.progress.get('value')
        if key == "建筑品质":
            jianzhu_score = value
        elif key == "户型设计":
            huxing_score = value
        elif key == "交通条件":
            jiaotong_score = value
        elif key == "教育质量":
            jiaoyu_score = value
        elif key == "商业环境":
            shangye_score = value
        elif key == "花园景观":
            jingguan_score = value
        elif key == "物业管理":
            wuye_score = value
        else:
            other = " %s:%s " % (key, value)

    peitao_node = lj.find('div', attrs={"class": "box peitao card_box"})
    map_api_node = peitao_node.find('img') if peitao_node is not None else None
    if map_api_node is not None:
        map_api = map_api_node.get('src')
    else:
        map_api = ""

    def get_geo_from_mapapi(map_api):
        geo = re.search(r'center=(?P<lng>[\d.]+),(?P<lat>[\d.]+)', map_api, re.I)
        if geo:
            lat = geo.group("lat")
            lng = geo.group("lng")
        else:
            lat = lng = None
        return lat, lng

    lat, lng = get_geo_from_mapapi(map_api)

    gonglue_dict = {
        "xiaoqu_id": id,
        "loc_name": loc_name,
        "total_score": totoal_score,
        "review_txt": review_txt if review_txt is not None else "",
        "jianzhu_score": jianzhu_score if jianzhu_score is not None else "",
        "huxing_score": huxing_score if huxing_score is not None else "",
        "jiaotong_score": jiaotong_score if jiaotong_score is not None else "",
        "jiaoyu_score": jiaoyu_score if jiaoyu_score is not None else "",
        "shangye_score": shangye_score if shangye_score is not None else "",
        "jingguan_score": jingguan_score if jingguan_score is not None else "",
        "wuye_score": wuye_score if wuye_score is not None else "",
        "map_api": map_api,
        "lng": lng if lng is not None else "",
        "lat": lat if lat is not None else "",
        "other": other
    }
    return gonglue_dict

3.2.2 根据列表,生成所有攻略信息列表

# 根据第一步获取的小区列表,逐项生成攻略列表
def handle_gonglue_by_xiaoqu(self, file_path, save_path, if_distance=False, local_geo=None):
    # 判断参数是否正确
    if if_distance == True and local_geo is None:
        logger.error("in handle_gonglue_by_xiaoqu, if_distance's Ture, local_geo can't be None")
        exit(1)

    # 生成小区列表
    url_list = []
    with open(file_path, encoding='utf-8-sig') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if row['have_gonglue'] == "1":
                url_list.append(row['xiaoqu_id'])

    # 如果攻略列表已存在,则统计已处理的记录
    handled_list = []
    fieldnames = ['xiaoqu_id', 'loc_name', 'total_score', "review_txt", "jianzhu_score", "huxing_score",
                  "jiaotong_score", "jiaoyu_score", "shangye_score", "jingguan_score", "wuye_score",
                  "map_api", "lat", "lng", "distance", "other"]
    if os.path.isfile(save_path):
        with open(save_path, encoding='utf-8-sig') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                handled_list.append(row['xiaoqu_id'])
    else:
        # 如果不存在,则创建一个空CSV文件,包含表头
        with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

    handled_set = set(handled_list)
    logger.info("handle_gonglue_by_xiaoqu, the length of url_list: %s" % len(url_list))

    # 针对每一个小区列表,获取攻略信息
    with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        for xiaoqu_id in url_list:
            if xiaoqu_id not in handled_set:
                gonglue_dict = self.get_xiaoqu_gonglue_dict(id=xiaoqu_id)
                if if_distance:
                    distance = get_distance((gonglue_dict["lat"], gonglue_dict["lng"]), local_geo)
                    gonglue_dict["distance"] = distance
                writer.writerow(gonglue_dict)
                handled_set.update({xiaoqu_id})
            else:
                logger.info("xiaoqu %s is handled" % xiaoqu_id)

3.3 根据攻略列表,生成关注的房子列表

3.3.1 获取单个小区的房子列表

# 根据小区id,获取小区的满足条件的房子列表
def get_houselist_by_xiaoqu(self, xiaoqu_id):
    # https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba67ea70c5011000009590
    # bp350ep450 表示价格开始和结束
    # l2l3 户型2室和3室
    # ba67ea70 面积67-70
    # c5011000009590 小区编号
    url = "https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba60ea90c%s" % xiaoqu_id
    html = requests.get(url=url, headers=self.page_headers).content

    house_list = []
    lj = BeautifulSoup(html, 'html.parser')
    # 页面中包含多个列表,包含当前搜索以及推荐其他小区
    view_body = lj.find('div', attrs={'class': 'list-view-section-body'})
    item_list = view_body.find_all('div', attrs={'class': 'lj-track', 'data-click-event': 'SearchClick'})
    for item in item_list:
        house_body = item.find("div", attrs={'class': 'kem__house-tile-ershou'})
        house_id = house_body.get("data-id")
        logger.info("handle house_id:%s" % house_id)

        house_txt = house_body.find("div", attrs={'class': 'house-text'})
        house_title = house_txt.find("div", attrs={"class": 'house-title'}).text
        house_desc = house_txt.find("div", attrs={"class": 'house-desc'}).string
        house_price_total = house_txt.find("span", attrs={"class": "price-total"}).strong.string
        house_price_unit = house_txt.find("span", attrs={"class": "price-unit"}).string.strip("元/平")

        house_dict = {
            "xiaoqu_id": xiaoqu_id,
            "house_id": house_id,
            "title": house_title,
            "desc": house_desc,
            "price_total": house_price_total,
            "price_unit": house_price_unit
        }
        house_list.append(house_dict)
    return house_list

3.3.2 根据攻略列表,生成房子列表

# 根据攻略列表,提取关注的小区,再逐项获取列表
def handle_hoselist_by_gonglue(self, file_path, save_path, filter_func=None):
    xiaoqu_list = []
    with open(file_path, encoding='utf-8-sig') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if filter_func is not None:
                if filter_func(row):
                    # 将小区的ID,加入到处理列表中
                    xiaoqu_list.append((row["xiaoqu_id"], row["loc_name"], row["distance"]))
            else:
                xiaoqu_list.append((row["xiaoqu_id"], row["loc_name"], row["distance"]))

    handled_list = []
    fieldnames = ['xiaoqu_id', 'xiaoqu_name', 'distance', 'house_id', 'title', "desc", "price_total", "price_unit"]
    if os.path.isfile(save_path):
        with open(save_path, encoding='utf-8-sig') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                handled_list.append(row['xiaoqu_id'])
    else:
        # 如果不存在,则创建一个空CSV文件,包含表头
        with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

    handled_set = set(handled_list)
    logger.info(
        "handle_hoselist_by_xiaoqu, to be handled: %s, have handled:%s " % (len(xiaoqu_list), len(handled_set)))

    with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        for xiaoqu_id, xiaoqu_loc_name, distance in xiaoqu_list:
            if xiaoqu_id not in handled_set:
                logger.info("handle xiaoqu:%s" % xiaoqu_id)
                house_list = self.get_houselist_by_xiaoqu(xiaoqu_id)
                if len(house_list) > 0:
                    for house_dict in house_list:
                        house_dict["xiaoqu_name"] = xiaoqu_loc_name
                        house_dict["distance"] = distance
                        writer.writerow(house_dict)
                else:
                    house_dict = {
                        "xiaoqu_id": xiaoqu_id,
                        "xiaoqu_name": xiaoqu_loc_name,
                        "distance": distance
                    }
                    writer.writerow(house_dict)
                    logger.info("小区:%s %s have no match house." % (xiaoqu_id, xiaoqu_loc_name))

                handled_set.update({xiaoqu_id})
            else:
                logger.info("%s is handled" % xiaoqu_id)
上一篇下一篇

猜你喜欢

热点阅读