真·从零开始写个下载器(二)-加入多线程

2020-11-04  本文已影响0人  TK丰

前言

上次写了一个很简单的下载器,这次就引入一下多线程,直接上代码

对线程不了解的可看看我之前写的简单讲讲进程、线程以及协程

单线程模式
import requests,time
import threading

def download(urlList, worker = 0):
    n = 0
    for url in urlList:
        n += 1
        r = requests.get(url)
        imageName = str(worker) + '_' + str(n) + '.jpeg'
        filePath = 'downloads' + '/' + imageName
        with open (filePath, 'wb') as f:
            f.write(r.content)    

if __name__ == '__main__':
    url = 'https://mmbiz.qpic.cn/mmbiz_jpg/KrUbaYdkiaBSKIhqlfLOia8deLy59zCg9CBv7hjIDt69nVbeypEhUYLPknziaicCFYlaWMm5S5eC4aOyx70wWRyrcQ/0?wx_fmt=jpeg'
    # 创建一个有100个链接的列表,模拟要下载100张图片
    urlList = [url]*100
    startTime = time.time()
    # 单线程下载100个图片
    download(urlList)
    #for i in range(0, 5):
      #  exec('urlList{} = urlList[{}:{}]'.format(i, i*10, (i+1)*10))
      #  exec('t{} = threading.Thread(target=download, args=(urlList{}, {}))'.format(i,i,i))
      #  exec('t{}.start()'.format(i))
    endTime = time.time()
    useTime = endTime-startTime
    print (useTime)
#     urlList1 = urlList[0:20]
#     urlList2 = urlList[21:40]
#     urlList3 = urlList[41:60]
#     urlList4 = urlList[61:80]
#     urlList5 = urlList[81:99]

#     t1 = threading.Thread(target=mutiDownload, args=(urlList1, '1')) 
#     t2 = threading.Thread(target=mutiDownload, args=(urlList2, '2'))
#     t3 = threading.Thread(target=mutiDownload, args=(urlList3, '3')) 
#     t4 = threading.Thread(target=mutiDownload, args=(urlList4, '4'))
#     t5 = threading.Thread(target=mutiDownload, args=(urlList5, '5')) 

    # t1.start()
    # t2.start()
    # t3.start()
    # t4.start()
    # t0.start()
多线程模式-创建了5个线程
import requests,time
import threading

def download(urlList, worker = 0):
    n = 0
    for url in urlList:
        n += 1
        r = requests.get(url)
        imageName = str(worker) + '_' + str(n) + '.jpeg'
        filePath = 'downloads' + '/' + imageName
        with open (filePath, 'wb') as f:
            f.write(r.content)    

if __name__ == '__main__':
    url = 'https://mmbiz.qpic.cn/mmbiz_jpg/KrUbaYdkiaBSKIhqlfLOia8deLy59zCg9CBv7hjIDt69nVbeypEhUYLPknziaicCFYlaWMm5S5eC4aOyx70wWRyrcQ/0?wx_fmt=jpeg'
    # 创建一个有100个链接的列表,模拟要下载100张图片
    urlList = [url]*100
    startTime = time.time()
    # download(urlList)
    for i in range(0, 5):
        exec('urlList{} = urlList[{}:{}]'.format(i, i*20, (i+1)*20))
        exec('t{} = threading.Thread(target=download, args=(urlList{}, {}))'.format(i,i,i))
        exec('t{}.start()'.format(i))
    for i in range(0, 5):
        exec('t{}.join()'.format(i))
    endTime = time.time()
    useTime = endTime-startTime
    print (useTime)

小知识点,exec支持动态执行代码,上面的用法是动态生成5个urlList以及5个线程执行,完全替代了注释的代码

对比两种方式的使用时间,你可以发现是有巨大的差别。在我的电脑上,单线程模式耗费8s,多线程模式大概在4s

为啥?

因为网络请求等待以及文件写入的时间对于cpu来说太长了。
单线程模式下,在网络请求时以及文件写入时,线程会被挂起,cpu转而处理其他进程(因为是单线程,所以线程挂起意味着进程挂起)。一直等事件处理完后,cpu才切回来进行下一个步骤
多线程模式下,cpu在挂起第一个线程后,转而去处理第二个线程,一直到5个线程都被挂起后,cpu转为切换进程。但如果当第5个线程挂起时,第一个线程准备完毕,那么cpu就返回来处理第一个进程,而非切换进程,从而达到提速的目的。

续上文

上篇文章也提到,如果遇到的是大文件,下载是否可分块下载。答案是可以的,这里就要稍微说说HTTP1.1协议里面的某些字段

想具体了解HTTP1.1协议的可以百度,这里只讲下载器会用到的字段

如上图,有一个文件有2048bit的大小,我们把他切割成两半,一个线程下载一半,另外一个线程下载另一半,那就实现分块下载了。
哔哔哔那么多,还是上代码吧,仍然以百度网盘的安装文件为例:

import threading
import requests
import os,time


class mutiDownload:
    '''
    流程:
        1.获得文件大小
        2.按照设定得块大小进行分块
        3.启动一个线程去下载对应得分块
        4.把所有分块合并成一个文件,并把已使用的分块删掉
        5.end
    '''

    def __init__(self, url):
        # 用来后面记录是需要多少个分块
        self.count = 0
        # 用来后面记录整体文件大小
        self.total = 0
        # 限制每个分块的大小,这里是10m
        self.downloadSize = 1024*1024*10
        # 请求的url
        self.url = url
        # 模拟请求的header
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
        }
        self.fileName = None
        self.downloadFilePath = self.downloadPath()

    # 创建一个临时存放分块的文件夹
    def downloadPath(self):
        basepath = os.path.abspath(os.path.dirname(__file__)) 
        down_path = os.path.join(basepath, 'downloads')
        if not os.path.isdir(down_path):
            os.mkdir(down_path)
            print ('Create download path {}'.format(down_path))
        return down_path

    def cutFileSize(self):
        r = requests.get(url = self.url, stream=True, headers = self.headers)
        self.fileName = r.headers['Content-Disposition'].split('=')[1].replace('\"','')
        self.total = float(r.headers['content-length'])
        # 计算要下载多少个分块
        self.count = int(self.total/self.downloadSize) + 1
        # 用来计算每个线程需要下载分块的起始位置以及结束位置
        self.downloadRangeList = [f'{_ * self.downloadSize}-{(_+1) * self.downloadSize-1}' if (_+1) * self.downloadSize <= self.total else f'{_ * self.downloadSize}-{int(self.total)}' for _ in range(0, self.count)]
        print (self.count)

    # 创建线程下载分块
    def mutiDownload(self):
        # 根据分块来创建线程,一个线程下载一个分块
        for i in range(0, self.count):
            exec('t{} = threading.Thread(target=self.downloadFile, args=({},{}))'.format(i, i, i))
            exec('t{}.start()'.format(i))
        # 在所有子线程完成时,阻塞主线程
        for i in range(0, self.count):
            exec('t{}.join()'.format(i))
        self.merge()

    # 开始下载
    def downloadFile(self, fileMeta, worker):
        # 设置chunk的大小
        chunk_size = 2048
        self.headers['Range'] = 'bytes=' + self.downloadRangeList[fileMeta]
        r = requests.get(self.url, headers = self.headers, stream=True)
        filePath = self.downloadFilePath + '/' + str(worker) + '_' + self.fileName
        with open(filePath, 'wb') as f:
            # 每到2048k的时候,就开始写入文件,而不是等完全写入内存后才写入文件
            for chunk in r.iter_content(chunk_size):
                f.write(chunk)

    # 合并所有分块
    def merge(self):
        fw = open(self.fileName, 'ab+')
        for i in range(0, self.count):
            filePath = self.downloadFilePath + '/' + str(i) + '_' + self.fileName
            with open(filePath, 'rb') as fr:
                buff = fr.read()
                fw.write(buff)
            # 删掉对应的分块
            os.remove(filePath)
        fw.close()
        check = os.path.getsize(self.fileName)
        # 判断合并后的文件大小是否跟服务器告知我们的文件大小一致
        if check == self.total:
            print ('Download File Correct')
        else:
            print ('SomeThing with Your DownloadFile')
        print ('Download has DONE')

if __name__ == '__main__':
    url = 'http://wppkg.baidupcs.com/issue/netdisk/MACguanjia/BaiduNetdisk_mac_3.4.1.dmg'
    d = mutiDownload(url)
    # 切割文件大小
    d.cutFileSize()
    startTime = time.time()
    # 开始多线程下载
    d.mutiDownload()
    endtime = time.time()
    # 输出下载完成的时间
    print (endtime-startTime)

单线程下载的,我就没有二次写了,在我上一篇文章中有对应的代码真·从零开始写个下载器(一)-最简单的下载功能

顺便也写一下协程的方式下载吧

import aiohttp,asyncio
import aiofiles
import os
import requests


def downloadPath():
    basepath = os.path.abspath(os.path.dirname(__file__)) 
    down_path = os.path.join(basepath, 'downloads')
    if not os.path.isdir(down_path):
        os.mkdir(down_path)
        print ('Create download path {}'.format(down_path))
    return down_path

def cutFileSize():
        r = requests.get(url = url, stream=True, headers = headers)
        fileName = r.headers['Content-Disposition'].split('=')[1].replace('\"','')
        total = float(r.headers['content-length'])
        count = int(total/downloadSize) + 1
        downloadHeaderList = [{'fileName':str(_)+fileName,'Range': f'bytes={_ * downloadSize}-{(_+1) * downloadSize-1}'} if (_+1) * downloadSize <= total else {'fileName':str(_)+fileName,'Range': f'bytes={_ * downloadSize}-{int(total)}'} for _ in range(0, count)]
        return downloadHeaderList

async def download(urlMeta):
    downPath = downloadPath()
    # header就不用说了,可查看上
    headers['Range'] = urlMeta['Range']
    filePath = downPath + '/' + urlMeta['fileName']
    async with aiohttp.ClientSession() as session:  
        async with session.get(url,headers=headers) as resp:
            async with aiofiles.open(filePath, 'wb') as f:
                while 1:
                    chunk = await resp.content.read(4096)
                    if not chunk:
                        f.close()
                        break
                    await f.write(chunk)

if __name__ == '__main__':
    headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
        }
    url = 'http://wppkg.baidupcs.com/issue/netdisk/MACguanjia/BaiduNetdisk_mac_3.4.1.dmg'
    downloadSize = 1024 * 1024 * 20
    urlList = cutFileSize()
    loop = asyncio.get_event_loop()
    tasks = [ asyncio.ensure_future(download(meta)) for meta in urlList]
    loop.run_until_complete(asyncio.wait(tasks))

其实如果有去看debug信息,会发现比较有趣的是,aiohttp这个库本质上也是用了线程池来实现协程

TO DO

下一步加入一个进度条吧~

Thank you for attention

To Be Continued

喜欢的点个在看呗
上一篇下一篇

猜你喜欢

热点阅读