python爬虫 m3u8的下载及AES解密

python爬虫 m3u8的下载及AES解密,第1张

python爬虫 m3u8的下载及AES加密的解密
    • 前言
    • 线程池版
    • 完整代码
    • 异步协程版

前言

这里与hxdm分享一篇关于m3u8视频流的爬取下载合并成mp4视频的方法,并且支持AES加密后的ts文件解密。xdm懂的都懂,也不用感谢了,哈哈哈!!!至于m3u8_url的链接就自己去找了哈。
好了,废话不多说,直接上代码!
(注意:本篇文章只做学习思路交流,不做除此之外的任何用途!!!)

目前测试过m3u8文件的AES加密后的有:


线程池版


完整代码
import os,shutil,time,requests
from Crypto.Cipher import AES
from fake_useragent import UserAgent
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
video_download_path = './m3u8Download'
save_mp4_path = './m3u8Download/testVideo'
save_temporary_ts_path = './m3u8Download/temporary_ts'
if not os.path.exists(video_download_path):
    os.makedirs(save_mp4_path)
    os.mkdir(save_temporary_ts_path)
#先定义一个发送请求方法,方便后面的重复调用
def send_request(url):
    headers = {
        'User-Agent': UserAgent().Chrome,
        'Accept': '*/*',
        'Accept-Encoding': 'gzip, deflate, br',
        'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"Windows"',
        'Accept-Language': 'zh-CN,zh;q=0.9',
    }
    try:
        response = requests.get(url=url,headers=headers)
        if response.status_code == 200:
            return response
        else:
            print(response,'响应异常!')
            exit()
    except Exception as e:
        print('m3u8链接请求异常!!!')
        print(e)
#这里先发送m3u8链接请求,得到返回的响应数据
def get_m3u8_response_data():
    m3u8_data = send_request(m3u8_url).text
    return m3u8_data
#然后对得到的m3u8数据进行解析,得到每一个ts_url链接,和视频的时长,有加密则提取出
def parse_m3u8_data():
    m3u8_data = get_m3u8_response_data()
    each_line_list = m3u8_data.strip('\n').split('\n') #对m3u8里面的内容提取出每一行数据
    all_ts_list = []
    video_time = []
    AES_decode_data = None
    if '#EXTM3U' in each_line_list:
        for i in each_line_list:
            if '#EXT-X-KEY' in i: #判断是否加密
                encryption_method,key_url, iv = parse_AES_encryption(i)
                print('加密方法:',encryption_method)
                key_url = urljoin(m3u8_url,key_url)
                AES_decode_data = AES_decode(key_url,iv)
            if not i.startswith('#') or i.startswith('http') or i.endswith('.ts'):
                each_ts_url = urljoin(m3u8_url, i)
                all_ts_list.append(each_ts_url)
            if i.startswith('#EXTINF'):
                time_ = float(i.strip().split(':')[1][:-1])
                video_time.append(time_)
    print('视频时长约为:{:.2f}分钟'.format(sum(video_time) / 60))
    return all_ts_list,AES_decode_data
#再对每一个ts_url链接发送请求(用线程池)
def get_each_ts_response_data():
    print('开始下载视频……')
    all_ts_list,AES_decode_data = parse_m3u8_data()
    ###初始化一个线程池,并设置最大线程数为30
    with ThreadPoolExecutor(max_workers=30) as executor:
        for i,ts_url in enumerate(all_ts_list):
            executor.submit(download_ts, i,ts_url,AES_decode_data)
    '''这里可以使用单线程来下载3个ts文件做测试'''
    # i = 0
    # for ts_url in all_ts_list:
    #     download_ts(i,ts_url,AES_decode_data)
    #     i += 1
    #     if i > 3:
    #         break
    print('视频下载结束!')
    return True
#下载并保存ts
def download_ts(i,ts_url,AES_decode_data):
    if AES_decode_data:
        ts_data = send_request(ts_url).content
        ts_data = AES_decode_data.decrypt(ts_data)
    else:
        ts_data = send_request(ts_url).content
    with open(f'{save_temporary_ts_path}/{i}.ts',mode='wb+') as f:
        f.write(ts_data)
        print(f'{i}.ts下载完成!')
#解析加密内容
def parse_AES_encryption(key_content):
    if 'IV' or 'iv' in key_content:
        parse_result = key_content.split('=')
        encryption_method = parse_result[1].split(',')[0]
        key_url = parse_result[2].split('"')[1]
        iv = parse_result[3]
        iv = iv[2:18].encode()
    else:
        parse_result = key_content.split('=')
        encryption_method = parse_result[1].split(',')[0]
        key_url = parse_result[2].split('"')[1]
        iv = None
    return encryption_method, key_url, iv
#AES解密
def AES_decode(key_url,iv):
    key = send_request(key_url).content
    if iv:
        AES_decode_data = AES.new(key, AES.MODE_CBC, iv)
    else:
        AES_decode_data = AES.new(key, AES.MODE_CBC, b'')
    return AES_decode_data
#最后合并所有的ts文件
def merge_all_ts_file():
    print('开始合并视频……')
    ts_file_list = os.listdir(save_temporary_ts_path)
    ts_file_list.sort(key=lambda x: int(x[:-3]))
    with open(save_mp4_path+'/video.mp4', 'wb+') as fw:
        for i in range(len(ts_file_list)):
            fr = open(os.path.join(save_temporary_ts_path, ts_file_list[i]), 'rb')
            fw.write(fr.read())
            fr.close()
    shutil.rmtree(save_temporary_ts_path) #删除所有的ts文件
    print('视频合并完成!')

def begin():
    if get_each_ts_response_data():
        merge_all_ts_file()
if __name__ == '__main__':
    start_time = time.time()
    ###m3u8_url链接自己找哈!
    m3u8_url = 'https://xxx.m3u8'
    begin()
    end_time = time.time()
    print(f'总共耗时:{end_time-start_time}秒')


异步协程版
import os,shutil,time
import asyncio,aiohttp,aiofiles
from Crypto.Cipher import AES
from fake_useragent import UserAgent
from urllib.parse import urljoin
class asyncioDownloadM3u8:
    def __init__(self,m3u8_url):
        self.m3u8_url = m3u8_url
        self.AES_decode_data = None #AES解密数据
        self.video_download_path = './m3u8Download'
        self.save_mp4_path = './m3u8Download/testVideo'
        self.save_temporary_ts_path = './m3u8Download/temporary_ts'
        if not os.path.exists(self.video_download_path):
            os.makedirs(self.save_mp4_path)
            os.mkdir(self.save_temporary_ts_path)
    async def send_request(self,url):
        async with aiohttp.ClientSession() as session:
            try:
                headers = {
                    'User-Agent': UserAgent().Chrome,
                    'Accept': '*/*',
                    'Accept-Encoding': 'gzip, deflate, br',
                    'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
                    'sec-ch-ua-mobile': '?0',
                    'sec-ch-ua-platform': '"Windows"',
                    'Accept-Language': 'zh-CN,zh;q=0.9',
                }
                async with session.get(url, headers=headers) as response:
                        assert response.status == 200
                        content = await response.read()
                        return content
            except Exception as e:
                print('m3u8链接请求异常!!!')
                print(e)
                exit()
    async def parse_m3u8_data(self):
        content = await self.send_request(self.m3u8_url)
        m3u8_data = content.decode('utf-8')
        each_line_list = m3u8_data.strip('\n').split('\n')
        all_ts_list = []
        video_time = []
        AES_decode_data = None
        if '#EXTM3U' in each_line_list:
            for i in each_line_list:
                if '#EXT-X-KEY' in i:  # 判断是否加密
                    encryption_method, key_url, iv = await self.parse_AES_encryption(i)
                    print('加密方法:', encryption_method)
                    key_url = urljoin(m3u8_url, key_url)
                    self.AES_decode_data = await self.AES_decode(key_url, iv)
                if not i.startswith('#') or i.startswith('http') or i.endswith('.ts'):
                    each_ts_url = urljoin(m3u8_url, i)
                    all_ts_list.append(each_ts_url)
                if i.startswith('#EXTINF'):
                    time_ = float(i.strip().split(':')[1][:-1])
                    video_time.append(time_)
        print('视频时长约为:{:.2f}分钟'.format(sum(video_time) / 60))
        return all_ts_list, AES_decode_data
    async def get_each_ts_response_data(self):
        print('开始下载视频……')
        all_ts_list, AES_decode_data =  await self.parse_m3u8_data()
        tasks = []
        i = 0
        for ts_url in all_ts_list:
            task = asyncio.create_task(self.download_ts(i,ts_url))
            tasks.append(task)
            i += 1
            #await asyncio.sleep(1) #担心爬取太快,可以使用异步休眠
            # if i > 2:
            #     break
        await asyncio.wait(tasks)
        print('视频下载结束!')
    async def download_ts(self,i,ts_url):
        if self.AES_decode_data:
            ts_data = await self.send_request(ts_url)
            ts_data = self.AES_decode_data.decrypt(ts_data)
        else:
            ts_data = await self.send_request(ts_url)
        async with aiofiles.open(f'{self.save_temporary_ts_path}/{i}.ts', mode="wb") as f:
            await f.write(ts_data)
            print(f'{i}.ts下载完成!')
    async def parse_AES_encryption(self,key_content):
        if 'IV' or 'iv' in key_content:
            parse_result = key_content.split('=')
            encryption_method = parse_result[1].split(',')[0]
            key_url = parse_result[2].split('"')[1]
            iv = parse_result[3]
            iv = iv[2:18].encode()
        else:
            parse_result = key_content.split('=')
            encryption_method = parse_result[1].split(',')[0]
            key_url = parse_result[2].split('"')[1]
            iv = None
        return encryption_method, key_url, iv
    async def AES_decode(self,key_url, iv):
        key = await self.send_request(key_url)
        if iv:
            AES_decode_data = AES.new(key, AES.MODE_CBC, iv)
        else:
            AES_decode_data = AES.new(key, AES.MODE_CBC, b'')
        return AES_decode_data
    async def merge_all_ts_file(self):
        print('开始合并视频……')
        ts_file_list = os.listdir(self.save_temporary_ts_path)
        ts_file_list.sort(key=lambda x: int(x[:-3]))
        async with aiofiles.open(self.save_mp4_path + '/video.mp4', 'wb+') as fw:
            for i in range(len(ts_file_list)):
                fr = open(os.path.join(self.save_temporary_ts_path, ts_file_list[i]), 'rb')
                await fw.write(fr.read())
                fr.close()
        shutil.rmtree(self.save_temporary_ts_path)
        print('视频合并完成!')
    async def begin(self):
        await self.get_each_ts_response_data()
        await self.merge_all_ts_file()

if __name__ == '__main__':
    start_time = time.time()
    ###m3u8_url链接自己找哈!
    m3u8_url = 'https://xxx.m3u8'
    adm = asyncioDownloadM3u8(m3u8_url=m3u8_url)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(adm.begin())
    end_time = time.time()
    print(f'总共耗时:{end_time - start_time}秒')

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/713858.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-24
下一篇 2022-04-24

发表评论

登录后才能评论

评论列表(0条)

保存