- 前言
- 线程池版
- 完整代码
- 异步协程版
这里与hxdm分享一篇关于m3u8视频流的爬取下载合并成mp4视频的方法,并且支持AES加密后的ts文件解密。xdm懂的都懂,也不用感谢了,哈哈哈!!!至于m3u8_url的链接就自己去找了哈。
好了,废话不多说,直接上代码!
(注意:本篇文章只做学习思路交流,不做除此之外的任何用途!!!)
目前测试过m3u8文件的AES加密后的有:
线程池版
完整代码
import os,shutil,time,requests
from Crypto.Cipher import AES
from fake_useragent import UserAgent
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
video_download_path = './m3u8Download'
save_mp4_path = './m3u8Download/testVideo'
save_temporary_ts_path = './m3u8Download/temporary_ts'
if not os.path.exists(video_download_path):
os.makedirs(save_mp4_path)
os.mkdir(save_temporary_ts_path)
#先定义一个发送请求方法,方便后面的重复调用
def send_request(url):
headers = {
'User-Agent': UserAgent().Chrome,
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
try:
response = requests.get(url=url,headers=headers)
if response.status_code == 200:
return response
else:
print(response,'响应异常!')
exit()
except Exception as e:
print('m3u8链接请求异常!!!')
print(e)
#这里先发送m3u8链接请求,得到返回的响应数据
def get_m3u8_response_data():
m3u8_data = send_request(m3u8_url).text
return m3u8_data
#然后对得到的m3u8数据进行解析,得到每一个ts_url链接,和视频的时长,有加密则提取出
def parse_m3u8_data():
m3u8_data = get_m3u8_response_data()
each_line_list = m3u8_data.strip('\n').split('\n') #对m3u8里面的内容提取出每一行数据
all_ts_list = []
video_time = []
AES_decode_data = None
if '#EXTM3U' in each_line_list:
for i in each_line_list:
if '#EXT-X-KEY' in i: #判断是否加密
encryption_method,key_url, iv = parse_AES_encryption(i)
print('加密方法:',encryption_method)
key_url = urljoin(m3u8_url,key_url)
AES_decode_data = AES_decode(key_url,iv)
if not i.startswith('#') or i.startswith('http') or i.endswith('.ts'):
each_ts_url = urljoin(m3u8_url, i)
all_ts_list.append(each_ts_url)
if i.startswith('#EXTINF'):
time_ = float(i.strip().split(':')[1][:-1])
video_time.append(time_)
print('视频时长约为:{:.2f}分钟'.format(sum(video_time) / 60))
return all_ts_list,AES_decode_data
#再对每一个ts_url链接发送请求(用线程池)
def get_each_ts_response_data():
print('开始下载视频……')
all_ts_list,AES_decode_data = parse_m3u8_data()
###初始化一个线程池,并设置最大线程数为30
with ThreadPoolExecutor(max_workers=30) as executor:
for i,ts_url in enumerate(all_ts_list):
executor.submit(download_ts, i,ts_url,AES_decode_data)
'''这里可以使用单线程来下载3个ts文件做测试'''
# i = 0
# for ts_url in all_ts_list:
# download_ts(i,ts_url,AES_decode_data)
# i += 1
# if i > 3:
# break
print('视频下载结束!')
return True
#下载并保存ts
def download_ts(i,ts_url,AES_decode_data):
if AES_decode_data:
ts_data = send_request(ts_url).content
ts_data = AES_decode_data.decrypt(ts_data)
else:
ts_data = send_request(ts_url).content
with open(f'{save_temporary_ts_path}/{i}.ts',mode='wb+') as f:
f.write(ts_data)
print(f'{i}.ts下载完成!')
#解析加密内容
def parse_AES_encryption(key_content):
if 'IV' or 'iv' in key_content:
parse_result = key_content.split('=')
encryption_method = parse_result[1].split(',')[0]
key_url = parse_result[2].split('"')[1]
iv = parse_result[3]
iv = iv[2:18].encode()
else:
parse_result = key_content.split('=')
encryption_method = parse_result[1].split(',')[0]
key_url = parse_result[2].split('"')[1]
iv = None
return encryption_method, key_url, iv
#AES解密
def AES_decode(key_url,iv):
key = send_request(key_url).content
if iv:
AES_decode_data = AES.new(key, AES.MODE_CBC, iv)
else:
AES_decode_data = AES.new(key, AES.MODE_CBC, b'')
return AES_decode_data
#最后合并所有的ts文件
def merge_all_ts_file():
print('开始合并视频……')
ts_file_list = os.listdir(save_temporary_ts_path)
ts_file_list.sort(key=lambda x: int(x[:-3]))
with open(save_mp4_path+'/video.mp4', 'wb+') as fw:
for i in range(len(ts_file_list)):
fr = open(os.path.join(save_temporary_ts_path, ts_file_list[i]), 'rb')
fw.write(fr.read())
fr.close()
shutil.rmtree(save_temporary_ts_path) #删除所有的ts文件
print('视频合并完成!')
def begin():
if get_each_ts_response_data():
merge_all_ts_file()
if __name__ == '__main__':
start_time = time.time()
###m3u8_url链接自己找哈!
m3u8_url = 'https://xxx.m3u8'
begin()
end_time = time.time()
print(f'总共耗时:{end_time-start_time}秒')
异步协程版
import os,shutil,time
import asyncio,aiohttp,aiofiles
from Crypto.Cipher import AES
from fake_useragent import UserAgent
from urllib.parse import urljoin
class asyncioDownloadM3u8:
def __init__(self,m3u8_url):
self.m3u8_url = m3u8_url
self.AES_decode_data = None #AES解密数据
self.video_download_path = './m3u8Download'
self.save_mp4_path = './m3u8Download/testVideo'
self.save_temporary_ts_path = './m3u8Download/temporary_ts'
if not os.path.exists(self.video_download_path):
os.makedirs(self.save_mp4_path)
os.mkdir(self.save_temporary_ts_path)
async def send_request(self,url):
async with aiohttp.ClientSession() as session:
try:
headers = {
'User-Agent': UserAgent().Chrome,
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
async with session.get(url, headers=headers) as response:
assert response.status == 200
content = await response.read()
return content
except Exception as e:
print('m3u8链接请求异常!!!')
print(e)
exit()
async def parse_m3u8_data(self):
content = await self.send_request(self.m3u8_url)
m3u8_data = content.decode('utf-8')
each_line_list = m3u8_data.strip('\n').split('\n')
all_ts_list = []
video_time = []
AES_decode_data = None
if '#EXTM3U' in each_line_list:
for i in each_line_list:
if '#EXT-X-KEY' in i: # 判断是否加密
encryption_method, key_url, iv = await self.parse_AES_encryption(i)
print('加密方法:', encryption_method)
key_url = urljoin(m3u8_url, key_url)
self.AES_decode_data = await self.AES_decode(key_url, iv)
if not i.startswith('#') or i.startswith('http') or i.endswith('.ts'):
each_ts_url = urljoin(m3u8_url, i)
all_ts_list.append(each_ts_url)
if i.startswith('#EXTINF'):
time_ = float(i.strip().split(':')[1][:-1])
video_time.append(time_)
print('视频时长约为:{:.2f}分钟'.format(sum(video_time) / 60))
return all_ts_list, AES_decode_data
async def get_each_ts_response_data(self):
print('开始下载视频……')
all_ts_list, AES_decode_data = await self.parse_m3u8_data()
tasks = []
i = 0
for ts_url in all_ts_list:
task = asyncio.create_task(self.download_ts(i,ts_url))
tasks.append(task)
i += 1
#await asyncio.sleep(1) #担心爬取太快,可以使用异步休眠
# if i > 2:
# break
await asyncio.wait(tasks)
print('视频下载结束!')
async def download_ts(self,i,ts_url):
if self.AES_decode_data:
ts_data = await self.send_request(ts_url)
ts_data = self.AES_decode_data.decrypt(ts_data)
else:
ts_data = await self.send_request(ts_url)
async with aiofiles.open(f'{self.save_temporary_ts_path}/{i}.ts', mode="wb") as f:
await f.write(ts_data)
print(f'{i}.ts下载完成!')
async def parse_AES_encryption(self,key_content):
if 'IV' or 'iv' in key_content:
parse_result = key_content.split('=')
encryption_method = parse_result[1].split(',')[0]
key_url = parse_result[2].split('"')[1]
iv = parse_result[3]
iv = iv[2:18].encode()
else:
parse_result = key_content.split('=')
encryption_method = parse_result[1].split(',')[0]
key_url = parse_result[2].split('"')[1]
iv = None
return encryption_method, key_url, iv
async def AES_decode(self,key_url, iv):
key = await self.send_request(key_url)
if iv:
AES_decode_data = AES.new(key, AES.MODE_CBC, iv)
else:
AES_decode_data = AES.new(key, AES.MODE_CBC, b'')
return AES_decode_data
async def merge_all_ts_file(self):
print('开始合并视频……')
ts_file_list = os.listdir(self.save_temporary_ts_path)
ts_file_list.sort(key=lambda x: int(x[:-3]))
async with aiofiles.open(self.save_mp4_path + '/video.mp4', 'wb+') as fw:
for i in range(len(ts_file_list)):
fr = open(os.path.join(self.save_temporary_ts_path, ts_file_list[i]), 'rb')
await fw.write(fr.read())
fr.close()
shutil.rmtree(self.save_temporary_ts_path)
print('视频合并完成!')
async def begin(self):
await self.get_each_ts_response_data()
await self.merge_all_ts_file()
if __name__ == '__main__':
start_time = time.time()
###m3u8_url链接自己找哈!
m3u8_url = 'https://xxx.m3u8'
adm = asyncioDownloadM3u8(m3u8_url=m3u8_url)
loop = asyncio.get_event_loop()
loop.run_until_complete(adm.begin())
end_time = time.time()
print(f'总共耗时:{end_time - start_time}秒')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)