python爬虫爬取某免费小说网站的小说(使用ip代理池和use
- 看看运行结果:
image.png
首先爬虫会在一个先前制定好的免费ip网站开始爬ip,同时验证ip的可用性,如果此ip是可用的,则将其添加到我们的代理池中。(总代理ip数量可以自己设置,我这里设置的是50个)
然后当我输入关键字后: image.png
(这里提一下小说不全的问题,这是网站问题,根据关键字它们网站一共只搜出这么多书。。。)------>>> 提一下: 如果想把小说爬的很全, 可以找别的网站, 我推荐一个:https://m.lread.net/
或者可以去正规网站花钱买会员啥的,然后用你的账号抓cookie模拟登录后再爬。
我上面发的那个网站小说挺全的,还可以根据作者名来搜索,下次可以试试这个网站,原理都差不多,也没啥难度,因为网站的小说应该也是从别的地方爬过来的,‘程序员才体谅程序员’,它们也不会设置什么反爬的手段,可能就算你user-agent不改都可以成功爬取到数据。
但是这里要注意下爬虫礼仪,我们用爬虫只是为了让计算机帮助我们获取数据,而不是像黑客那样要把别人服务器搞崩溃,我们可以在爬数据的过程中适当设置程序休眠,不要爬的太快增加它们服务器的压力。(别人可能会因为这个用封你ip等手段来搞你。马上我会讲下这个免费ip代理网站就是这样的,你爬的过快,它认为你是爬虫就直接给你ip封了。)
- 下面讲讲程序的细节:
(一): 抓取免费ip代理网站你需要数量的ip。
目标url: http://www.ip3366.net/
它这个网站有很多页,我们需要跳转不同的页面来抓取足够多的数据: image.png
用这个params参数就可以了,初始设置为第一页。
这里要注意的一点 是爬到ip后,要立刻判断这个ip究竟是不是可用的ip,如果是不可用的ip就要跳过它,判断的时候有两种方法,第一种就是爬到后直接去判断,这种方法会慢一些(刚开始我还没发现),第二种就是开一个线程,让它在线程里面去判断,这种方法还挺快的。我在程序里面两种方法都写了。
两种方法主要的函数都是: image.png
(二):设置user-agent代理池。
至于打多少个看个人兴趣,这里注意下可能网上有的人发的是不对的,或者说现在不能用了,要自己判断。
(三):用户输入要下载的小说名称 根据1获得的url跳转。
image.png
像这样,用户输入后就改变get url的params参数。
(四):抓取网站搜索后得到的网站匹配出的所有书籍信息。
这里我首先讲下免费代理ip的用法:
image.png
大致格式是这样。其中注意这里面两个https,(特别坑),用到了我前面发的ssl的概念(详情可以去看看)。
大意就是有的网站没有ca证书(比如我爬的这个小说网站),所以它的访问域名最前面时http而不是https,如果你用你自己电脑的ip去访问它,可能一点事都没有,但是如果你用代理ip去访问它,而且你代理ip里面用的也是http,那么就访问不了,具体原因我也不清楚,我想可能要求必须安全访问?可以试试将verify设置为False就能访问了?很搞的一点是对于https网站来说,代理ip就不能用https,也会出错。实在搞不清楚,反正规律就是原网站有s你就不加,原网站没有你就加上。
我用的xpath,到底用re,xpath,bs4就看个人习惯了。
(五):通过关键词匹配书。
image.png
首先如果当前目录不存在小说文件夹,就创建它。
然后匹配出所有的书,书的信息是字典格式,name参数是书名,url参数是小说首页的url。
(六):按顺序将整本小说缓存到本地。
这里面你可以多搞几个逻辑,到底是全下还是下一本等等。主要代码:
image.png
也没有啥难度,找准待匹配关键字就好了。
大概整体逻辑就是这样。
3 .完整代码如下
# coding='utf-8'
import requests
import threading
from lxml import etree
import time
from random import choice
import os
# 开多线程判断ip是否有效, 可以加快爬取的效率
class My_thread(threading.Thread):
def __init__(self, ip, port):
super().__init__()
self.ip = ip
self.port = port
def test_ip_valid(self):
test_url = 'https://image.baidu.com'
try:
headers['User-Agent'] = choice(user_agent)
requests.get(test_url, headers=headers, timeout=10,
proxies={'http': 'http://{}:{}'.format(
self.ip, self.port
)})
ip_list.append((self.ip, self.port))
except Exception as e:
print('找到一个异常ip/端口...')
def run(self):
self.test_ip_valid()
# 不用线程找到后直接判断有用性
def valify_ip(ip, port):
test_url = 'https://image.baidu.com'
try:
headers['User-Agent'] = choice(user_agent)
requests.get(test_url, headers=headers, timeout=10,
proxies={'http': 'http://{}:{}'.format(
ip, port
)})
ip_list.append((ip, port))
except Exception as e:
print('找到一个异常ip/端口...')
# 根据所给书的url和存储路径path将书保存到本地
def download_book(name, url, path):
global ip_index_counter
headers['User-Agent'] = choice(user_agent)
ip_index_counter += 1
if ip_index_counter >= total_ip_number:
ip_index_counter = 0
proxy_ip = {
'https': 'https://{}:{}'.format(ip_list[ip_index_counter][0],
ip_list[ip_index_counter][1])
}
response = requests.get(url, headers=headers, proxies=proxy_ip)
target = etree.HTML(response.content.decode('utf-8'))
result_list = target.xpath('//a[@class="compulsory-row-one none"]')
auth_name = target.xpath(r'//meta[@property="og:novel:author"]')[0].get('content')
last_title = target.xpath(r'//meta[@property="og:novel:latest_chapter_name"]')[0].get('content')
str_novel = '''
##############################################
######为您找到这小说, 它的信息如下:
######书名:{}
######作者:{}
######最后一章:{}
######请您选择是否下载?(y/n)
##############################################
:'''.format(name, auth_name, last_title)
flag = False
while True:
if flag:
_user_choice = input(str_novel+'您的输入不合法, 请重新输入!\n:')
flag = False
else:
_user_choice = input(str_novel)
if _user_choice in 'yY':
break
elif _user_choice in 'nN':
os._exit()
else:
flag = True
continue
# 保存小说每一章的标题和url
novel_result_list = []
judge_dup = set()
for each in result_list:
temp_dict = {}
temp_dict['title'] = each.get('title')
temp_dict['url'] = 'http://www.xundu.net' + each.get('href')
if temp_dict['url'] not in judge_dup:
novel_result_list.append(temp_dict)
judge_dup.add(temp_dict['url'])
end_one_capt = '\n\n\n\n\n\n'
# novel_str = '\n\n\n\n\n{} ----{}'.format(search_key, auth_name)+end_one_capt
start_of_novel_each_title = '〓〓〓〓〓〓〓〓 '
# 正式下载小说到本地
print('\n请耐心等待, {} 正在导入本地文件...\n'.format(search_key))
with open(path, 'w', encoding='utf-8') as f:
f.write('小说名称: {}\n\n作者: {}{}'.format(search_key, auth_name, end_one_capt))
for index, each in enumerate(novel_result_list):
headers['User-Agent'] = choice(user_agent)
ip_index_counter += 1
if ip_index_counter >= total_ip_number:
ip_index_counter = 0
proxy_ip = {
'https': 'https://{}:{}'.format(ip_list[ip_index_counter][0],
ip_list[ip_index_counter][1])
}
response = requests.get(each['url'], headers=headers, proxies=proxy_ip)
target = etree.HTML(response.content.decode('utf-8'))
result_list = target.xpath('//div[@class="size16 color5 pt-read-text"]')[0]
f.write(start_of_novel_each_title+each['title']+'\n\n\n')
for _each in result_list:
if _each.text:
f.write(' '+_each.text+'\n\n')
else:
f.write(' '+'\n\n')
f.write(end_one_capt)
print('成功导入 : {}'.format(each['title']))
# with open(path, 'w', encoding='utf-8') as f:
# f.write(novel_str)
print('\n成功!\n##$$$$#{}已经成功导入本地文件!\n'.format(search_key))
user_agent = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11',
'User-Agent: Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)',
]
headers = {}
# ip_list 从免费ip网站爬取多个高匿ip, 同时多线程验证ip可用性
total_ip_number = 50
free_ip_web = 'http://www.ip3366.net/?'
page = 1
params = {
'page': str(page),
}
ip_list = []
flag = True
headers['User-Agent'] = choice(user_agent)
print('生成代理ip列表中, 请稍等片刻.........')
while flag:
response = requests.get(free_ip_web, headers=headers, params=params)
response_text = response.content.decode('gbk')
target = etree.HTML(response_text)
temp_list = target.xpath(r'//tbody/tr')
for each in temp_list:
ip, port = each.getchildren()[0:2]
# 看网上说的用线程, 我现在感觉线程没啥用... 我太弱了.
My_thread(ip.text, port.text).start()
# 好像用线程更快.....
# valify_ip(ip.text, port.text)
if len(ip_list) >= total_ip_number:
flag = False
break
print('第{}页找完...'.format(page))
time.sleep(2)
page += 1
params['page'] = str(page)
print('{}个代理ip生成完毕 !'.format(total_ip_number))
ip_index_counter = 0
# 免费代理ip的用法
proxy_ip = {
'https': 'https://{}:{}'.format(ip_list[ip_index_counter][0],
ip_list[ip_index_counter][1])
}
# 1 .抓取网站所有小说名称和url组成的字典键值对
search_url = 'http://www.xundu.net/search/result.html?'
# 2.用户输入要下载的小说名称 根据1获得的url跳转
search_key = input('请输入您要下载的小说的关键字 : ')
params = {
'searchkey': search_key,
}
'''
# 简直是天坑
# 什么鬼??你加s我就不能加,你不加我就必须加???暂时不懂
http和https的区别?
'''
# 3.抓取跳转后所有网页的url
headers['User-Agent'] = choice(user_agent)
response = requests.get(search_url, headers=headers, params=params,
proxies=proxy_ip)
ip_index_counter += 1
target = etree.HTML(response.content.decode('utf-8'))
search_result_list = target.xpath(r'//a[@class="size18 color2"]')
all_similar_books = []
flag = True
# 通过关键词找书
# 存在和关键词相同的书则直接下载
if not os.path.exists('./小说'):
os.mkdir('./小说')
for each in search_result_list:
temp_dict = {}
temp_dict['name'] = each.get('title')
temp_dict['url'] = 'http://www.xundu.net'+each.get('href')
if temp_dict['name'] == search_key:
flag = False
download_book(search_key, temp_dict['url'], './小说/{}.txt'.format(search_key))
break
all_similar_books.append(temp_dict)
# 4.按顺序将整本小说缓存到本地
# 任何书都没找到
if not search_result_list:
print('很抱歉,没有找到关键词为{}的书,也没有找到近似书..退出'.format(search_key))
os.system('pause')
os._exit(0)
# 如果找到了很多书并且里面没有和输入关键词相同的书
if flag:
print('\n\n没有找到与和{}同名的书,但是找到了它的一些近似书:'.format(search_key))
for index, each in enumerate(all_similar_books):
print(each['name'], ':', each['url'])
# 输入的提示信息
choice_str = '''
##################################################
请做出以下任一选择:
1: 输入书籍前面的编号来下载这本近似书.
2: 直接下载所有近似书.
0: 这些书一本都不下载,直接退出.
##################################################
:'''
flag = False
while True:
if flag:
user_choice = input(choice_str+'您的输入不规范, 请重新输入: \n:')
flag = False
else:
user_choice = input(choice_str)
if user_choice == '1':
books_to_show = '\n\n########################################\n'
print(books_to_show)
print('编号: 0 退出程序~~~~~~')
for index, _each in enumerate(all_similar_books):
headers['User-Agent'] = choice(user_agent)
ip_index_counter += 1
if ip_index_counter >= total_ip_number:
ip_index_counter = 0
proxy_ip = {
'https': 'https://{}:{}'.format(ip_list[ip_index_counter][0],
ip_list[ip_index_counter][1])
}
response = requests.get(_each['url'], headers=headers, proxies=proxy_ip)
target = etree.HTML(response.content.decode('utf-8'))
auth_name = target.xpath(r'//meta[@property="og:novel:author"]')[0].get('content')
last_title = target.xpath(r'//meta[@property="og:novel:latest_chapter_name"]')[0].get('content')
print('编号: {} 小说名: {} 作者 : {} 最后章节 : {}'.format(
index+1, _each['name'], auth_name, last_title
))
print(books_to_show)
while True:
w_book_to_download = input('\n请输入对应书籍前面的编号:')
try:
_name = all_similar_books[int(w_book_to_download)-1]['name']
if 1 <= int(w_book_to_download) <= len(all_similar_books):
download_book(_name, all_similar_books[int(w_book_to_download)-1]['url'],
'./小说/{}.txt'.format(_name))
break
elif w_book_to_download == '0':
print('程序退出!')
os._exit(0)
else:
print('此编号不是正确范围内的编号!')
print('请重新输入!')
except Exception as e:
print(e)
print('编号格式不符合要求!')
print('请重新输入!')
elif user_choice == '2':
for each in all_similar_books:
print('\n下载{}>>>>>>>\n\n'.format(each['name']))
download_book(each['name'], each['url'], './小说/{}.txt'.format(each['name']))
elif user_choice == '0':
os._exit(0)
else:
flag = True