Python之m3u8解析下载以及ffmpeg转码的实现(附De

2023-11-16  本文已影响0人  生命不止运动不息

m3u8文件是一堆小片段的集合,将所有的小片段都保存后,在合成就等于下载完成了,m3u8有固定的格式,根据格式解析出所有小片段的url链接,有可能存在加密的情况,大部分都是AES加密,这里的办法是将加密的key与ts片段都获取保存下来,本地生成一个m3u8文件。最后使用ffmpeg直接转码为mp4 ,ffmpeg转码时会自行解密。

1、解析m3u8

python不愧是超高级语言,好用的三方库特别多,有一个库就叫 m3u8,地址如下:m3u8 · PyPI

image.png

可以使用pip安装:pip install m3u8

下面贴出解析的代码:

以下代码是拿到m3u8解析后的对象实例,根据该对象实例去获取片段的链接等
注意:因考虑到m3u8的内容可能是二级的m3u8链接,因此使用递归解析

# 解析得到m3u8实例
def parseM3u8(self, m3u8Url):
    playlist = m3u8.load(m3u8Url)
    if playlist.is_variant:
        playlists = playlist.playlists
        if len(playlists) > 0 :
            subPlaylist = playlists[0]   # 默认取第一个,有多个的情况,其实可以优化(应该是选择分辨率高的)
            #subM3u8Url = subPlaylist.base_uri + subPlaylist.uri
            subM3u8Url = subPlaylist.absolute_uri
            return self.parseM3u8(subM3u8Url)
        else :
            return None
    else :
        return playlist

根据m3u8的实例,获取ts片段以及加密key的链接。同时取得文件的下载路径。方便下载时使用

片段的absolute_uri属性,就是该片段的完整的url链接了。

# 从m3u8中,得到ts和key的链接,以及文件的存储全路径
def getFileUrlsAndPaths(self, playlist): 
    allUrls = []
    allFilePaths = []
    tempPath = self.tempPath(self.downloadPath, self.fileName)
    for seg in playlist.segments:
        if seg == None:
            break
        allUrls.append(seg.absolute_uri)
        na = tempPath + '/' + getUrlLastPath(seg.absolute_uri)
        allFilePaths.append(na)

    for key in playlist.keys:
        if key == None:
            break
        allUrls.append(key.absolute_uri)
        na = tempPath + '/' + getUrlLastPath(key.absolute_uri)
        allFilePaths.append(na)

    return allUrls, allFilePaths

至此,拿到了片段的url,m3u8解析就完成了

2、生成本地的m3u8文件,用于ffmpeg进行转码

因为我们下载的ts片段文件与m3u8文件保存到同一目录下,因此就需要将m3u8中的原ts片段的url替换为本地的路径。因此这里通过修改uri值,来达到这一目地。

修改完成后,使用m3u8库的dump方法,将内容保存到本地文件中

# 创建本地m3u8文件,用于ffmpeg的ts合并以及转码mp4
def createNativeM3u8File(self, playlist, m3u8Path): 

    for seg in playlist.segments:
        if seg == None:
            break
        seg.uri = getUrlLastPath(seg.absolute_uri)

    for key in playlist.keys:
        if key == None:
            break
        key.uri = getUrlLastPath(key.absolute_uri)

    playlist.dump(m3u8Path)

3、下载所有片段以及key等文件

这里使用多线程下载,同时支持5个任务的下载,统计下载数量。当全部下载完成后开始转码

# 下载单个文件:url文件的地址,filepath文件的存储全路径
def download_file(self, url, filePath):  

    fn = getUrlLastPath(filePath)
    
    if os.path.exists(filePath):

        self.downloadedCount += 1  
        cnt = f"({self.downloadedCount}/{self.totalCount})"
        print(f"文件 {fn} 已存在{cnt}")

        return


    response = requests.get(url)  
    if response.status_code == 200:  
        with open(filePath, 'wb') as file:  
            file.write(response.content)

        self.downloadedCount += 1  
        cnt = f"({self.downloadedCount}/{self.totalCount})"
        print(f"文件 {fn} 下载完成{cnt}")  

    else:  
        
        # 若下载失败,则开始重试
        allKeys = self.repeatTimesDic.keys()
        retryTimes = 0
        if url in allKeys :
            retryTimes = self.repeatTimesDic[url]

        print(f"无法下载文件 {fn}: {response.status_code}")  
        if retryTimes >= self.maxRepeatTimes : 
            print(f'文件 {fn} 已达到最大下载次数 {self.maxRepeatTimes} 次')
            print(f'文件url:{url}')
            self.failedCount += 1

        else :
            retryTimes = retryTimes + 1
            self.repeatTimesDic[url] = retryTimes
            print(f'开始第{retryTimes}次重试')
            self.download_file(url, filePath)


# 多线程下载
def download_files_concurrently(self, url_list, filename_list):  
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:  
        # executor.map(lambda x: self.download_file(*x), zip(url_list, filename_list))   
        # 将 url_list 和 filename_list 中的对应元素作为参数传递给 download_file 方法  
        tasks = [executor.submit(self.download_file, url, filename) for url, filename in zip(url_list, filename_list)]  
          
        # 等待所有任务完成  
        for future in concurrent.futures.as_completed(tasks):  
            # Future.result() 会阻塞并等待任务完成,并返回任务结果(这里是 None)  
            result = future.result()  
            if result is not None:  # 如果结果不为 None,则说明任务完成了  
                print("Task completed:", result)  
          
        # 所有任务都已完成  
        printTime("All tasks are done.")
        self.downloadComplete()

下载效果图


image.png

4、转码为mp4

当ts片段、key(若存在)、m3u8文件都保存完成后。使用命令行调用ffmpeg进行转码。

转码函数如下:只有两个参数,m3u8的本地路径,输出的文件路径

def m3u8ToMp4(m3u8Path, mp4Path):

    ffmpeg_command = f"ffmpeg -allowed_extensions ALL -protocol_whitelist \"file,http,crypto,tcp\" -i {m3u8Path} -c copy {mp4Path}"  
    
    print('=== ffmpeg开始转码 ===')
    print('m3u8路径: '+m3u8Path)
    print('输出mp4路径: '+mp4Path)
    # 使用subprocess调用FFmpeg并捕获输出  
    process = subprocess.Popen(ffmpeg_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)  
    stdout, stderr = process.communicate()  
    
    # 检查命令是转码行成功  
    if process.returncode == 0:  
        print("=== ffmpeg转码完成 ====")    
        print(stdout.decode())  
        return True
    else:  
        print("=== 转换失败!=== ")  
        print(stderr.decode())
        return False

整体来说都很容易,唯一的麻烦就是多线程的下载,需要处理并发的问题。

5、Demo代码

完整代码请到以下链接:

https://download.csdn.net/download/wangkunggxx/88541574

一共有3个文件,m3u.py、m3u8dl.py、transcode.py

transcode.py 用来转码

m3u8dl.py 核心代码,封装了一个M3u8Downloader的类

m3u.py 供直接调用。

调用示例:
python3 m3u.py 'https://hls.cntv.cdn20.com/asp/hls/1200/0303000a/3/default/1049a60b8b914d4ab7066c568ca616fd/1200.m3u8' '小孩子大梦想'
原文链接:

https://blog.csdn.net/wangkunggxx/article/details/134444941

上一篇下一篇

猜你喜欢

热点阅读