python的爬虫项目(链家买二手房)
不知不觉,已经工作6年有余,恍恍惚惚,有机会满足房子需求。
在收集房子信息过程中,做些记录。
贝壳的功能很强大,但很难满足小区、距离、教育、面积等多个方面的匹配,使用起来成本仍然较高。
针对以上情况,编写该项目,收集链家的二手房数据。项目中,主要根据价格来筛选小区,并根据小区教育、同工作位置的距离来确定关注小区,再通过房子面积、总价、户型来确定可以选择的房子 列表,从而将购房精力集中在关注的重点小区和房子中。
当然,每个人可以根据自己需求进行调整。
一、基础环境说明
1.1 基础环境
1.1.1 python
官方文档:https://www.python.org/doc/
1.1.2 request(加载页面)
官方文档:https://cn.python-requests.org/zh_CN/latest/
1.1.3 BeautifuSoup(提取信息 )
官方文档:https://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html
常用使用例子:
from bs4 import BeautifulSoup
soup = BeautifulSoup(a, "html.parser")
soup.title.text # '标题'
# 一、提取标签
# 1.1 提取唯一标签
soup.h1
soup.find('h1')
soup.find_all('h1')[0]
# 1.2 提取多个标签
soup.find_all('h2')
# [<h2>标题2</h2>, <h2>标题3</h2>]
soup.find_all(['h1','h2'])
# [<h1>标题1</h1>, <h2>标题2</h2>, <h2>标题3</h2>]
# 1.3 使用正则表达式
import re
soup.find_all(re.compile('^h'))
# [<h1>标题1</h1>, <h2>标题2</h2>, <h2>标题3</h2>]
# 二、匹配属性
# 2.1 匹配属性1,直接将属性名作为参数名,但是有些属性不行,比如像a-b这样的属性
soup.find_all('p', id = 'p1') # 一般情况
soup.find_all('p', class_='p3') # class是保留字比较特殊,需要后面加一个_
# 2.2 匹配属性2,最通用的方法
soup.find_all('p', attrs={'class':'p3'}) # 包含这个属性就算,而不是只有这个属性
soup.find_all('p', attrs={'class':'p3','id':'pp'}) # 使用多个属性匹配
soup.find_all('p', attrs={'class':'p3','id':False}) # 指定不能有某个属性
soup.find_all('p', attrs={'id':['p1','p2']}) # 属性值是p1或p2
soup.find_all('p', attrs={'class':True}) # 含有class属性即可
# 2.3 匹配属性3,正则表达式匹配
import re
soup.find_all('p', attrs={'id':re.compile('^p')}) # 使用正则表达式
# 三、根据标签内容文本来识别
# 3.1 匹配标签内容1,正则表达式
import re
soup.find_all('p', text=re.compile('段落'))
soup.find_all('p',text=True)
# 3.2 匹配标签内容2,传入函数
def nothing(c):
return c not in ['段落1','段落2','文章']
soup.find_all('p',text=nothing)
def has_class_but_no_id(tag):
return tag.has_attr('class') and not tag.has_attr('id')
# 四、提取内容
# 4.1 提取标签文本
soup.h.text # 多层嵌套也可以直接返回
soup.h.a.text # 也可以这样
soup.body.text # 里面有多个内容时 '\n标题\n段落1\n段落2\n'
# 4.2 其他标签的属性值
# 提取属性值,像字典一样提取,以下两种方法等价
soup.h.a['href']
soup.h.a.get('href')
# 五、提取标签信息
print(i.name) # 提取标签名
print(i.attrs) # 提取标签所有属性值
print(i.has_attr('href')) # 检查标签是否有某属性
# 五、示例
soup.find('p', attrs={'class':'first'}).text # '文字1'
soup.find_all('p') # [<p class="first">文字1</p>, <p class="second">文字2</p>], 再分别从中提取文字
soup.find('ul', attrs={'class':'list1'}).find_all('li') # [<li>列表1第1项</li>, <li>列表1第2项</li>]
# 代码参考:https://zhuanlan.zhihu.com/p/35354532
1.1.4 地理位置信息(百度API)
官方文档:http://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding
调用方式1:
def geocodeB(address):
base = "http://api.map.baidu.com/geocoder?address=%s&output=json&key=yourak&city=上海" % address
response = requests.get(base)
if response.status_code == 200:
answer = response.json()
if "location" in answer['result'] and "level" in answer['result']:
return (address,
# round(answer['result']['location']['lng'], 5),
answer['result']['location']['lng'],
# round(answer['result']['location']['lat'], 5),
answer['result']['location']['lat'],
answer['result']["level"])
else:
logger.error("geocodeB %s warning:%s" % (address, answer))
return None
else:
logger.error("geocodeB %s Error" % address)
return None
调用方式2:
def geocodeB2(address):
from urllib.request import urlopen, quote
from urllib.parse import quote_plus
import hashlib, json
# 以get请求为例http://api.map.baidu.com/geocoder/v2/?address=百度大厦&output=json&ak=yourak
queryStr = '/geocoder/v2/?address=%s&city=上海&output=json&ak=$yourak$' % address
# 对queryStr进行转码,safe内的保留字符不转换
encodedStr = quote(queryStr, safe="/:=&?#+!$,;'@()*[]")
# 在最后直接追加上yoursk
rawStr = encodedStr + '$yoursn$'
sn = hashlib.md5(quote_plus(rawStr).encode("utf8")).hexdigest()
url = 'http://api.map.baidu.com%s&sn=%s' % (encodedStr, sn)
req = urlopen(url)
res = req.read().decode() # 将其他编码的字符串解码成unicode
answer = json.loads(res) # 对json数据进行解析
if "location" in answer['result'] and "level" in answer['result']:
return answer['result']['location']['lat'], answer['result']['location']['lng']
else:
logger.error("geocodeB %s warning:%s" % (address, answer))
return None
调用方式3:
def geocode_by_baidu(address):
from geopy.geocoders import baidu
apikey = '$yourak$' # 从网站申请 http://lbsyun.baidu.com/apiconsole/key?application=key
sn = '$yoursn$'
g = baidu.Baidu(api_key=apikey, security_key=sn, timeout=200)
a = g.geocode(address)
# return (round(a.latitude, 6), round(a.longitude, 6))
return a.latitude, a.longitude
1.1.5 地理获取距离计算(geopy)
# x and y is (lat,lng)
def get_distance(x, y):
from geopy.distance import geodesic
return round(geodesic(x, y).km, 3)
1.1.6 解决懒加载和滚动加载(selenium)
Selenium是一个用于Web应用程序测试的工具。
Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE(7, 8, 9, 10, 11),Firefox,Safari,Chrome,Opera等。
使用python爬虫调用selenium来模拟正常用户访问浏览器.
1.2 主要问题
1.2.1 懒加载问题
参考:http://www.selenium.org.cn/
1.2.2 滚动加载问题
1.2.3 IP访问限制
二、前期准备
2.1 分析获取的需求
个人买房需求:
预算:400万,最多不超过450万;
教育:2梯队学区房
户型:二房以上
房龄:1990年后
面积:60平米以上
交通:离世纪大道乘公交不超过1小时
===》
1、学区情况,根据小区攻略的教育评分来过滤,确定小区范围
2、根据a. 小区的中的房子的价格,使用预算过滤;b.小区的位置,通过距离来过滤交通,不满足 的小区
3、通过符合要求的小区列表,来针对每个小区获取房子列表,并确定跟踪重点小区
特别说明:
1、为什么不直接获取房子呢?房子无法判断是否满足教育;如果通过房子找小区,再找教育,考虑房子比小区多出几个数量级,会有更多的时间浪费
2、通过预算和面积需求,可以确定房子的单价,通过单价来筛选小区,减少小区范围。
2.2 分析页面路径
2.2.1 获取小区列表
1、小区列表的链接分析
由于链家仅显示前100页内容,而整个上海的小区显然比100页更多,故根据区来获取小区。
https://m.ke.com/sh/xiaoqu/hongkou/bp5ep7.5pg%s/
其中
- bp5ep7.5为价格在5-7.5万的区间,bp为begin price;ep为end price。
- pg为page的页面
2、小区是否有评价的判断
可以根据第一步获取的小区列表中,查看小区是否存在小区攻略标签来判断是否有小区评价信息
特别说明:并不是每一个小区,都可以查看到小区的教育评分
示例链接:https://m.ke.com/sh/xiaoqu/5011000016009/,可以获取到小区的整体的评分
2.2.2 根据小区,获取攻略
小区的攻略地址为:
https://m.ke.com/sh/xiaoqu/5011000016009/gonglueV2.html?click_source=m_resblock_detail#review
对于小区,有总体评分和分项评分,其中分项评分包含建筑品质、户型设计、交通条件、教育质量、商业环境、花园景观、物业管理等评分。
每个人可以根据自己的需求,使用不同的评分项进行小区过滤。
例如,我优先考虑教育,则以教育条件进行主要过滤条件,要求教育8分以上,而其他的要求6.5分以上。
2.2.3 根据小区,获取房子列表
小区的房子列表:
原始路径:https://m.ke.com/sh/ershoufang/c5011000016009/
增加过滤:https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba67ea70c5011000016009
其中,bp350ep450为价格区间为350-450万;l2l3户型为2室或3室;ba67ea70为面积在67-70平米;c5011000016009 为小区编号为5011000016009。
三、项目代码实现
3.1 获取小区
def get_xiaoqu_list(self, area, save_path):
page_size = 100
# 由于仅收集上海,故未对多城市处理
fieldnames = ['area', 'page', 'xiaoqu_id', 'url', 'name', "brief", "loc", "build_type", "build_year", "price",
"have_gonglue"]
# 如果不存在,则创建一个空CSV文件,包含表头
# 如果已存在,则将记录已处理的记录情况(针对IP限制,需要跑多次情况)
handled_list = []
if os.path.isfile(save_path):
with open(save_path, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
handled_list.append("%s_%s" % (row['area'], row['page']))
else:
with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
handled_set = set(handled_list)
logger.info(
"get_xiaoqu_list, have handled:%s " % (len(handled_set)))
# 针对上海各区进行处理
with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
for page_num in range(1, page_size):
# https://m.ke.com/sh/xiaoqu/pudong/pb4ep4.5pg10/
url = "https://m.ke.com/sh/xiaoqu/%s/bp5ep7.5pg%s/" % (area, str(page_num))
if "%s_%s" % (area, page_num) in handled_set:
logger.info("%s has been handled." % url)
continue
else:
logger.info(url)
# 获取页面内容
r = requests.get(url=url, headers=self.page_headers)
html = r.content
lj = BeautifulSoup(html, 'html.parser')
page_items = lj.find_all('li', attrs={'class': 'pictext'})
# 解析各页中的小区列表
if len(page_items) > 0:
for item in page_items:
xiaoqu_url = item.a.get('href')
xiaoqu_id = xiaoqu_url.split("/")[-2]
xiaoqu_gonglue = item.find_all("p", attrs={"class": "gonglue_title"})
if len(xiaoqu_gonglue) == 0:
is_gonglue = 0
else:
is_gonglue = 1
xiaoqu_info = item.find_all("div", attrs={"class": "item_list"})[0]
xiaoqu_name = xiaoqu_info.find_all("div", attrs={"class": "item_main"})[0].string
xiaoqu_brief = xiaoqu_info.find_all("div", attrs={"class": "item_other"})[0].string.strip(
"\n\r \"")
xiaoqu_brief = " ".join(xiaoqu_brief.split())
xiaoqu_loc = xiaoqu_brief.split()[0]
build_type = xiaoqu_brief.split()[1]
build_year = re.search(r' (?P<build_year>\d{1,})年建成', xiaoqu_brief, re.I)
if build_year:
xiaoqu_build = build_year.group("build_year")
else:
xiaoqu_build = ""
xiaoqu_price = xiaoqu_info.find_all("span", attrs={"class": "price_total"})[0].em.string
xiaoqu_dict = {
"area": area,
"page": page_num,
"xiaoqu_id": xiaoqu_id,
"url": xiaoqu_url,
"name": xiaoqu_name,
"brief": xiaoqu_brief,
"loc": xiaoqu_loc,
"build_type": build_type,
"build_year": xiaoqu_build,
"price": xiaoqu_price,
"have_gonglue": is_gonglue
}
writer.writerow(xiaoqu_dict)
else:
# 表面已到最后一页
break
handled_set.update({"%s_%s" % (area, page_num)})
3.2 根据小区列表,获取包含攻略的小区
3.2.1 根据单个页面获取小区详细信息
# 根据指定小区的id,获取小区的攻略信息
def get_xiaoqu_gonglue_dict(self, id):
url = "https://m.ke.com/sh/xiaoqu/%s/gonglueV2.html?click_source=m_resblock_detail#review" % id
logger.info(url)
# 根据url加载页面
# https://m.ke.com/sh/xiaoqu/5011000007603/gonglueV2.html?click_source=m_resblock_detail#review
html = requests.get(url=url, headers=self.page_headers).content
lj = BeautifulSoup(html, 'html.parser')
loc_node = lj.find('div', attrs={'class': 'head_location'})
if loc_node is not None:
loc_name = loc_node.string
else:
loc_name = ""
cpt_content = lj.find_all('div', attrs={'id': 'review'})[0]
totoal_score = cpt_content.find('div', attrs={'class': "review_score"}).get_text().replace("综合测评得分", "")
review_txt = ""
if cpt_content.find('div', attrs={'class': "review_txt_box"}) is not None:
review_txt = cpt_content.find('div', attrs={'class': "review_txt_box"}).get_text().strip(" \n\r")
review_list_txt = cpt_content.find('ul', attrs={'class': "review_list"})
review_list = review_list_txt.find_all('li')
other = ""
jianzhu_score = huxing_score = jiaotong_score = shangye_score = jiaoyu_score = jingguan_score = wuye_score = ""
for item in review_list:
key = item.span.string
value = item.progress.get('value')
if key == "建筑品质":
jianzhu_score = value
elif key == "户型设计":
huxing_score = value
elif key == "交通条件":
jiaotong_score = value
elif key == "教育质量":
jiaoyu_score = value
elif key == "商业环境":
shangye_score = value
elif key == "花园景观":
jingguan_score = value
elif key == "物业管理":
wuye_score = value
else:
other = " %s:%s " % (key, value)
peitao_node = lj.find('div', attrs={"class": "box peitao card_box"})
map_api_node = peitao_node.find('img') if peitao_node is not None else None
if map_api_node is not None:
map_api = map_api_node.get('src')
else:
map_api = ""
def get_geo_from_mapapi(map_api):
geo = re.search(r'center=(?P<lng>[\d.]+),(?P<lat>[\d.]+)', map_api, re.I)
if geo:
lat = geo.group("lat")
lng = geo.group("lng")
else:
lat = lng = None
return lat, lng
lat, lng = get_geo_from_mapapi(map_api)
gonglue_dict = {
"xiaoqu_id": id,
"loc_name": loc_name,
"total_score": totoal_score,
"review_txt": review_txt if review_txt is not None else "",
"jianzhu_score": jianzhu_score if jianzhu_score is not None else "",
"huxing_score": huxing_score if huxing_score is not None else "",
"jiaotong_score": jiaotong_score if jiaotong_score is not None else "",
"jiaoyu_score": jiaoyu_score if jiaoyu_score is not None else "",
"shangye_score": shangye_score if shangye_score is not None else "",
"jingguan_score": jingguan_score if jingguan_score is not None else "",
"wuye_score": wuye_score if wuye_score is not None else "",
"map_api": map_api,
"lng": lng if lng is not None else "",
"lat": lat if lat is not None else "",
"other": other
}
return gonglue_dict
3.2.2 根据列表,生成所有攻略信息列表
# 根据第一步获取的小区列表,逐项生成攻略列表
def handle_gonglue_by_xiaoqu(self, file_path, save_path, if_distance=False, local_geo=None):
# 判断参数是否正确
if if_distance == True and local_geo is None:
logger.error("in handle_gonglue_by_xiaoqu, if_distance's Ture, local_geo can't be None")
exit(1)
# 生成小区列表
url_list = []
with open(file_path, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if row['have_gonglue'] == "1":
url_list.append(row['xiaoqu_id'])
# 如果攻略列表已存在,则统计已处理的记录
handled_list = []
fieldnames = ['xiaoqu_id', 'loc_name', 'total_score', "review_txt", "jianzhu_score", "huxing_score",
"jiaotong_score", "jiaoyu_score", "shangye_score", "jingguan_score", "wuye_score",
"map_api", "lat", "lng", "distance", "other"]
if os.path.isfile(save_path):
with open(save_path, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
handled_list.append(row['xiaoqu_id'])
else:
# 如果不存在,则创建一个空CSV文件,包含表头
with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
handled_set = set(handled_list)
logger.info("handle_gonglue_by_xiaoqu, the length of url_list: %s" % len(url_list))
# 针对每一个小区列表,获取攻略信息
with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
for xiaoqu_id in url_list:
if xiaoqu_id not in handled_set:
gonglue_dict = self.get_xiaoqu_gonglue_dict(id=xiaoqu_id)
if if_distance:
distance = get_distance((gonglue_dict["lat"], gonglue_dict["lng"]), local_geo)
gonglue_dict["distance"] = distance
writer.writerow(gonglue_dict)
handled_set.update({xiaoqu_id})
else:
logger.info("xiaoqu %s is handled" % xiaoqu_id)
3.3 根据攻略列表,生成关注的房子列表
3.3.1 获取单个小区的房子列表
# 根据小区id,获取小区的满足条件的房子列表
def get_houselist_by_xiaoqu(self, xiaoqu_id):
# https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba67ea70c5011000009590
# bp350ep450 表示价格开始和结束
# l2l3 户型2室和3室
# ba67ea70 面积67-70
# c5011000009590 小区编号
url = "https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba60ea90c%s" % xiaoqu_id
html = requests.get(url=url, headers=self.page_headers).content
house_list = []
lj = BeautifulSoup(html, 'html.parser')
# 页面中包含多个列表,包含当前搜索以及推荐其他小区
view_body = lj.find('div', attrs={'class': 'list-view-section-body'})
item_list = view_body.find_all('div', attrs={'class': 'lj-track', 'data-click-event': 'SearchClick'})
for item in item_list:
house_body = item.find("div", attrs={'class': 'kem__house-tile-ershou'})
house_id = house_body.get("data-id")
logger.info("handle house_id:%s" % house_id)
house_txt = house_body.find("div", attrs={'class': 'house-text'})
house_title = house_txt.find("div", attrs={"class": 'house-title'}).text
house_desc = house_txt.find("div", attrs={"class": 'house-desc'}).string
house_price_total = house_txt.find("span", attrs={"class": "price-total"}).strong.string
house_price_unit = house_txt.find("span", attrs={"class": "price-unit"}).string.strip("元/平")
house_dict = {
"xiaoqu_id": xiaoqu_id,
"house_id": house_id,
"title": house_title,
"desc": house_desc,
"price_total": house_price_total,
"price_unit": house_price_unit
}
house_list.append(house_dict)
return house_list
3.3.2 根据攻略列表,生成房子列表
# 根据攻略列表,提取关注的小区,再逐项获取列表
def handle_hoselist_by_gonglue(self, file_path, save_path, filter_func=None):
xiaoqu_list = []
with open(file_path, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if filter_func is not None:
if filter_func(row):
# 将小区的ID,加入到处理列表中
xiaoqu_list.append((row["xiaoqu_id"], row["loc_name"], row["distance"]))
else:
xiaoqu_list.append((row["xiaoqu_id"], row["loc_name"], row["distance"]))
handled_list = []
fieldnames = ['xiaoqu_id', 'xiaoqu_name', 'distance', 'house_id', 'title', "desc", "price_total", "price_unit"]
if os.path.isfile(save_path):
with open(save_path, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
handled_list.append(row['xiaoqu_id'])
else:
# 如果不存在,则创建一个空CSV文件,包含表头
with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
handled_set = set(handled_list)
logger.info(
"handle_hoselist_by_xiaoqu, to be handled: %s, have handled:%s " % (len(xiaoqu_list), len(handled_set)))
with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
for xiaoqu_id, xiaoqu_loc_name, distance in xiaoqu_list:
if xiaoqu_id not in handled_set:
logger.info("handle xiaoqu:%s" % xiaoqu_id)
house_list = self.get_houselist_by_xiaoqu(xiaoqu_id)
if len(house_list) > 0:
for house_dict in house_list:
house_dict["xiaoqu_name"] = xiaoqu_loc_name
house_dict["distance"] = distance
writer.writerow(house_dict)
else:
house_dict = {
"xiaoqu_id": xiaoqu_id,
"xiaoqu_name": xiaoqu_loc_name,
"distance": distance
}
writer.writerow(house_dict)
logger.info("小区:%s %s have no match house." % (xiaoqu_id, xiaoqu_loc_name))
handled_set.update({xiaoqu_id})
else:
logger.info("%s is handled" % xiaoqu_id)