真·从零开始写个下载器(二)-加入多线程
前言
上次写了一个很简单的下载器,这次就引入一下多线程,直接上代码
对线程不了解的可看看我之前写的简单讲讲进程、线程以及协程
单线程模式
import requests,time
import threading
def download(urlList, worker = 0):
n = 0
for url in urlList:
n += 1
r = requests.get(url)
imageName = str(worker) + '_' + str(n) + '.jpeg'
filePath = 'downloads' + '/' + imageName
with open (filePath, 'wb') as f:
f.write(r.content)
if __name__ == '__main__':
url = 'https://mmbiz.qpic.cn/mmbiz_jpg/KrUbaYdkiaBSKIhqlfLOia8deLy59zCg9CBv7hjIDt69nVbeypEhUYLPknziaicCFYlaWMm5S5eC4aOyx70wWRyrcQ/0?wx_fmt=jpeg'
# 创建一个有100个链接的列表,模拟要下载100张图片
urlList = [url]*100
startTime = time.time()
# 单线程下载100个图片
download(urlList)
#for i in range(0, 5):
# exec('urlList{} = urlList[{}:{}]'.format(i, i*10, (i+1)*10))
# exec('t{} = threading.Thread(target=download, args=(urlList{}, {}))'.format(i,i,i))
# exec('t{}.start()'.format(i))
endTime = time.time()
useTime = endTime-startTime
print (useTime)
# urlList1 = urlList[0:20]
# urlList2 = urlList[21:40]
# urlList3 = urlList[41:60]
# urlList4 = urlList[61:80]
# urlList5 = urlList[81:99]
# t1 = threading.Thread(target=mutiDownload, args=(urlList1, '1'))
# t2 = threading.Thread(target=mutiDownload, args=(urlList2, '2'))
# t3 = threading.Thread(target=mutiDownload, args=(urlList3, '3'))
# t4 = threading.Thread(target=mutiDownload, args=(urlList4, '4'))
# t5 = threading.Thread(target=mutiDownload, args=(urlList5, '5'))
# t1.start()
# t2.start()
# t3.start()
# t4.start()
# t0.start()
多线程模式-创建了5个线程
import requests,time
import threading
def download(urlList, worker = 0):
n = 0
for url in urlList:
n += 1
r = requests.get(url)
imageName = str(worker) + '_' + str(n) + '.jpeg'
filePath = 'downloads' + '/' + imageName
with open (filePath, 'wb') as f:
f.write(r.content)
if __name__ == '__main__':
url = 'https://mmbiz.qpic.cn/mmbiz_jpg/KrUbaYdkiaBSKIhqlfLOia8deLy59zCg9CBv7hjIDt69nVbeypEhUYLPknziaicCFYlaWMm5S5eC4aOyx70wWRyrcQ/0?wx_fmt=jpeg'
# 创建一个有100个链接的列表,模拟要下载100张图片
urlList = [url]*100
startTime = time.time()
# download(urlList)
for i in range(0, 5):
exec('urlList{} = urlList[{}:{}]'.format(i, i*20, (i+1)*20))
exec('t{} = threading.Thread(target=download, args=(urlList{}, {}))'.format(i,i,i))
exec('t{}.start()'.format(i))
for i in range(0, 5):
exec('t{}.join()'.format(i))
endTime = time.time()
useTime = endTime-startTime
print (useTime)
小知识点,exec支持动态执行代码,上面的用法是动态生成5个urlList以及5个线程执行,完全替代了注释的代码
对比两种方式的使用时间,你可以发现是有巨大的差别。在我的电脑上,单线程模式耗费8s,多线程模式大概在4s
为啥?
因为网络请求等待以及文件写入的时间对于cpu来说太长了。
单线程模式下,在网络请求时以及文件写入时,线程会被挂起,cpu转而处理其他进程(因为是单线程,所以线程挂起意味着进程挂起)。一直等事件处理完后,cpu才切回来进行下一个步骤
多线程模式下,cpu在挂起第一个线程后,转而去处理第二个线程,一直到5个线程都被挂起后,cpu转为切换进程。但如果当第5个线程挂起时,第一个线程准备完毕,那么cpu就返回来处理第一个进程,而非切换进程,从而达到提速的目的。
续上文
上篇文章也提到,如果遇到的是大文件,下载是否可分块下载。答案是可以的,这里就要稍微说说HTTP1.1协议里面的某些字段
想具体了解HTTP1.1协议的可以百度,这里只讲下载器会用到的字段
- Content-Length:1024
这个字段由服务器返回,会告诉你这个下载的文件大小,以bit为单位 - Content-Disposition:attachment;filename=1.jpg
这个字段也是由服务器返回,其中attachment是告诉浏览器要弹窗以附件的形式下载,后者是告诉你下载的文件名是啥 -
{ 'Range':'bytes=0-1024'}
这个字段由客户端请求服务器,用于告知服务器你要的文件是从0到1024bit这段。
示意图
如上图,有一个文件有2048bit的大小,我们把他切割成两半,一个线程下载一半,另外一个线程下载另一半,那就实现分块下载了。
哔哔哔那么多,还是上代码吧,仍然以百度网盘的安装文件为例:
import threading
import requests
import os,time
class mutiDownload:
'''
流程:
1.获得文件大小
2.按照设定得块大小进行分块
3.启动一个线程去下载对应得分块
4.把所有分块合并成一个文件,并把已使用的分块删掉
5.end
'''
def __init__(self, url):
# 用来后面记录是需要多少个分块
self.count = 0
# 用来后面记录整体文件大小
self.total = 0
# 限制每个分块的大小,这里是10m
self.downloadSize = 1024*1024*10
# 请求的url
self.url = url
# 模拟请求的header
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
self.fileName = None
self.downloadFilePath = self.downloadPath()
# 创建一个临时存放分块的文件夹
def downloadPath(self):
basepath = os.path.abspath(os.path.dirname(__file__))
down_path = os.path.join(basepath, 'downloads')
if not os.path.isdir(down_path):
os.mkdir(down_path)
print ('Create download path {}'.format(down_path))
return down_path
def cutFileSize(self):
r = requests.get(url = self.url, stream=True, headers = self.headers)
self.fileName = r.headers['Content-Disposition'].split('=')[1].replace('\"','')
self.total = float(r.headers['content-length'])
# 计算要下载多少个分块
self.count = int(self.total/self.downloadSize) + 1
# 用来计算每个线程需要下载分块的起始位置以及结束位置
self.downloadRangeList = [f'{_ * self.downloadSize}-{(_+1) * self.downloadSize-1}' if (_+1) * self.downloadSize <= self.total else f'{_ * self.downloadSize}-{int(self.total)}' for _ in range(0, self.count)]
print (self.count)
# 创建线程下载分块
def mutiDownload(self):
# 根据分块来创建线程,一个线程下载一个分块
for i in range(0, self.count):
exec('t{} = threading.Thread(target=self.downloadFile, args=({},{}))'.format(i, i, i))
exec('t{}.start()'.format(i))
# 在所有子线程完成时,阻塞主线程
for i in range(0, self.count):
exec('t{}.join()'.format(i))
self.merge()
# 开始下载
def downloadFile(self, fileMeta, worker):
# 设置chunk的大小
chunk_size = 2048
self.headers['Range'] = 'bytes=' + self.downloadRangeList[fileMeta]
r = requests.get(self.url, headers = self.headers, stream=True)
filePath = self.downloadFilePath + '/' + str(worker) + '_' + self.fileName
with open(filePath, 'wb') as f:
# 每到2048k的时候,就开始写入文件,而不是等完全写入内存后才写入文件
for chunk in r.iter_content(chunk_size):
f.write(chunk)
# 合并所有分块
def merge(self):
fw = open(self.fileName, 'ab+')
for i in range(0, self.count):
filePath = self.downloadFilePath + '/' + str(i) + '_' + self.fileName
with open(filePath, 'rb') as fr:
buff = fr.read()
fw.write(buff)
# 删掉对应的分块
os.remove(filePath)
fw.close()
check = os.path.getsize(self.fileName)
# 判断合并后的文件大小是否跟服务器告知我们的文件大小一致
if check == self.total:
print ('Download File Correct')
else:
print ('SomeThing with Your DownloadFile')
print ('Download has DONE')
if __name__ == '__main__':
url = 'http://wppkg.baidupcs.com/issue/netdisk/MACguanjia/BaiduNetdisk_mac_3.4.1.dmg'
d = mutiDownload(url)
# 切割文件大小
d.cutFileSize()
startTime = time.time()
# 开始多线程下载
d.mutiDownload()
endtime = time.time()
# 输出下载完成的时间
print (endtime-startTime)
单线程下载的,我就没有二次写了,在我上一篇文章中有对应的代码真·从零开始写个下载器(一)-最简单的下载功能
顺便也写一下协程的方式下载吧
import aiohttp,asyncio
import aiofiles
import os
import requests
def downloadPath():
basepath = os.path.abspath(os.path.dirname(__file__))
down_path = os.path.join(basepath, 'downloads')
if not os.path.isdir(down_path):
os.mkdir(down_path)
print ('Create download path {}'.format(down_path))
return down_path
def cutFileSize():
r = requests.get(url = url, stream=True, headers = headers)
fileName = r.headers['Content-Disposition'].split('=')[1].replace('\"','')
total = float(r.headers['content-length'])
count = int(total/downloadSize) + 1
downloadHeaderList = [{'fileName':str(_)+fileName,'Range': f'bytes={_ * downloadSize}-{(_+1) * downloadSize-1}'} if (_+1) * downloadSize <= total else {'fileName':str(_)+fileName,'Range': f'bytes={_ * downloadSize}-{int(total)}'} for _ in range(0, count)]
return downloadHeaderList
async def download(urlMeta):
downPath = downloadPath()
# header就不用说了,可查看上
headers['Range'] = urlMeta['Range']
filePath = downPath + '/' + urlMeta['fileName']
async with aiohttp.ClientSession() as session:
async with session.get(url,headers=headers) as resp:
async with aiofiles.open(filePath, 'wb') as f:
while 1:
chunk = await resp.content.read(4096)
if not chunk:
f.close()
break
await f.write(chunk)
if __name__ == '__main__':
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
url = 'http://wppkg.baidupcs.com/issue/netdisk/MACguanjia/BaiduNetdisk_mac_3.4.1.dmg'
downloadSize = 1024 * 1024 * 20
urlList = cutFileSize()
loop = asyncio.get_event_loop()
tasks = [ asyncio.ensure_future(download(meta)) for meta in urlList]
loop.run_until_complete(asyncio.wait(tasks))
其实如果有去看debug信息,会发现比较有趣的是,aiohttp这个库本质上也是用了线程池来实现协程
TO DO
下一步加入一个进度条吧~