#!/usr/bin/env python
# encoding: utf-8
import os
import sys
import shutil
import subprocess
import collections
from google.colab import drive
from google.colab import files
from google.colab import auth
auth.authenticate_user()
from oauth2client.client import GoogleCredentials
drive.mount('/content/drive', force_remount=False)
import re
import math
import random
import tqdm
import time
import requests
import multiprocessing
from IPython.display import clear_output
try:
from retrying import retry
except:
!pip install pycryptodome
!pip install retrying
!pip install ffmpeg
!pip install moviepy
from Crypto.Cipher import AES
from retrying import retry
from moviepy.editor import *
import moviepy.editor as moviepy
#=================================================================================================================================================================================================================================================
class M3u8:
'''
This is a main Class, the file contains all documents.
One document contains paragraphs that have several sentences
It loads the original file and converts the original file to new content
Then the new content will be saved by this class
'''
def __init__(self):
'''
Initial the custom file by self
'''
self.real_url = ""
self.encrypt = False
self.headers = {"User-Agent": 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'}
"""self.proxies = {
'http': 'http://127.0.0.1:9001',
'https': 'https://127.0.0.2:9002'
}"""
def hello(self):
print("*" * 50)
print('m3u8鏈接下載')
print(' ' * 10 + '適用於非加密 | 加密鏈接')
print("*" * 50)
return self
def create_path(self, downPath, savePath, mp4_path):
try:
if not os.path.exists("/content/drive/MyDrive/Video/"):
os.makedirs("/content/drive/MyDrive/Video/")
if not os.path.exists(mp4_path):
os.makedirs(mp4_path)
if not os.path.exists(downPath):
os.makedirs(downPath)
if not os.path.exists(savePath):
os.makedirs(savePath)
except:
time.sleep(5)
self.create_path(downPath, savePath, mp4_path)
@retry(stop_max_attempt_number=5)
def request(self, url):
'''
Send a request
:param url: the url of request
:param params: the params of request
:return: the result of request
'''
#response = requests.get(url, headers=self.headers, timeout=15, verify=False, stream=True, proxies=self.proxies)
time.sleep(random.random()/3)
response = requests.get(url, headers=self.headers, timeout=15)
assert response.status_code == 200
return response
def checkUrl(self, url):
'''
Determine if it is a available link of m3u8
:return: bool
'''
if '.m3u8' not in url:
return False
elif not url.startswith('http'):
return False
else:
return True
def parse(self, url):
'''
Analyze a link of m3u8
:param url: string, the link need to analyze
:return: list
'''
container = list()
incomplete_m3u8_url = ""
response = self.request(url).text.split('\n')
for ts in response:
if '.ts' in ts:
container.append(ts)
if '#EXT-X-KEY:' in ts:
self.encrypt = True
if ".m3u8" in ts:
incomplete_m3u8_url = ts
if self.encrypt and len(container) !=0:
print("這個視頻被加密。")
if len(container) == 0 and incomplete_m3u8_url !="":
try:
self.real_url = self.get_real_url(incomplete_m3u8_url, url)
except Exception as e:
print(e)
return container
def get_real_url(self, incomplete_m3u8_url, url):
if incomplete_m3u8_url[0]!="/":
incomplete_url = "/" + incomplete_m3u8_url
else:
incomplete_url = incomplete_m3u8_url
url_list = url.split("/")
item = incomplete_url.split("/")[1]
try:
new_url = "/".join(url_list[:url_list.index(item)])+incomplete_url
except Exception as e:
new_url = "/".join(url_list[:-1])+incomplete_url
print("視頻m3u8的網站可能是: \n%s"%(new_url))
return new_url
def getEncryptKey(self, url):
'''
Access to the secret key
:param url: string, Access to the secret key by the url
:return: string
'''
encryptKey = self.request("{}/key.key".format(url)).content
return encryptKey
def aesDecode(self, data, key):
'''
Decode the data
:param data: stream, the data need to decode
:param key: secret key
:return: decode the data
'''
crypt = AES.new(key, AES.MODE_CBC, key)
plain_text = crypt.decrypt(data)
return plain_text.rstrip(b'\0')
def download(self, queue, sort, file, downPath, url, encryptKey,):
'''
Download the debris of video
:param queue: the queue
:param sort: which number debris
:param file: the link of debris
:param downPath: the path to save debris
:param url: the link of m3u8
:return: None
'''
queue.put(file)
baseUrl = '/'.join(url.split("/")[:-1])
if self.encrypt:
self.encryptKey = encryptKey
if not file.startswith("http"):
file = baseUrl + '/' +file
debrisName = "{}/{}.ts".format(downPath, sort)
if not os.path.exists(debrisName):
response = self.request(file)
with open(debrisName, "wb") as f:
if self.encrypt:
data = self.aesDecode(response.content, self.encryptKey)
f.write(data)
f.flush()
else:
f.write(response.content)
f.flush()
def progressBar(self, queue, count):
'''
Show progress bar
:param queue: the queue
:param count: the number count of debris
:return: None
'''
print('---一共{}個碎片...'.format(count))
for i in tqdm.tqdm(range(count)):
file = queue.get()
def run(self, downPath, savePath, mp4_path):
'''
program entry, please input url:
'''
while True: #加密 #沒有加密
url = str(input("請輸入合法的m3u8鏈接\n>>>")) #["https://pps.sd-play.com/20211224/MZHxznS1/index.m3u8","https://m3u.if101.tv/xm3u8/cbea7f06e5ed9e877bbff543d2d2ac1efe377a2d86455ddcde4d69f491abf2499921f11e97d0da21.m3u8"]
if self.checkUrl(url):
break
self.create_path(downPath, savePath, mp4_path)
print('---正在分析鏈接...')
container = self.parse(url)
if len(container) == 0:
try:
url = self.real_url
container = self.parse(url)
except:
print("---找不到碎片---")
return None
baseUrl = '/'.join(url.split("/")[:-1])
if self.encrypt:
encryptKey = self.getEncryptKey(baseUrl)
else:
encryptKey = None
print('---鏈接分析成功...')
print('---進程開始運行...')
po = multiprocessing.Pool(20)
queue = multiprocessing.Manager().Queue()
size = 0
for file in container:
sort = str(size).zfill(5)
po.apply_async(self.download, args=(queue, sort, file, downPath, url, encryptKey,))
size += 1
po.close()
self.progressBar(queue, len(container))
print('---進程運行結束...')
print('---任務下載完成...')
saveName = time.strftime("%Y%m%d_%H%M%S", time.localtime())
print('---文件合併中...')
time.sleep(20)
os.system("cat {}/*.ts>{}/{}.ts".format(downPath, savePath, saveName))
print('---文件合併完成---將轉換成mp4...')
self.convert_ts_to_mp4(savePath, mp4_path, saveName)
try:
folder = os.listdir(downPath)
for file in folder:
open(os.path.join(downPath, file), "w").close()
shutil.rmtree(downPath)
except:
pass
print(saveName +".mp4")
def convert_ts_to_mp4(self, savePath, mp4_path, saveName):
print("---正在轉換ts影片...,請耐心等待一會兒。")
file = saveName + ".ts"
infile = os.path.join(savePath, file)
outfile = os.path.join(mp4_path, file.split(".ts")[0]+".mp4")
if os.path.getsize(infile) == 0:
print("---TS文件內容為空---")
return
os.system("ffmpeg -i %s -acodec copy -vcodec copy -f mp4 %s"%(infile, outfile))
time.sleep(2)
try:
Y = os.listdir(mp4_path).index(file.split(".ts")[0]+".mp4")
print("---正在刪除舊的ts文件...")
open(infile, "w").close()
os.remove(infile)
except:
time.sleep(5)
self.convert_ts_to_mp4(savePath, mp4_path, saveName)
print("---轉換完成---")
#=======================================================================================================================================================================================================================================================
global downPath, savePath, mp4_path
downPath = "/content/drive/MyDrive/Video/download_ts_path/"
savePath = "/content/drive/MyDrive/Video/combine_ts_path/"
mp4_path = "/content/drive/MyDrive/Video/Mp4/"
clear_output()
if __name__ == "__main__":
M3u8().hello().run(downPath, savePath, mp4_path)