05
2022
02

download m3u8 video (Update in 17/02/2022)

#!/usr/bin/env python 
# encoding: utf-8

import os
import sys
import shutil
import subprocess
import collections

from google.colab import drive
from google.colab import files
from google.colab import auth
auth.authenticate_user()

from oauth2client.client import GoogleCredentials
drive.mount('/content/drive', force_remount=False)

import re
import math
import random
import tqdm
import time
import requests
import multiprocessing 
from IPython.display import clear_output

try:
  from retrying import retry

except:
  !pip install pycryptodome
  !pip install retrying
  !pip install ffmpeg
  !pip install moviepy

  from Crypto.Cipher import AES
  from retrying import retry
  from moviepy.editor import *
  import moviepy.editor as moviepy
#=================================================================================================================================================================================================================================================
class M3u8:
  '''
    This is a main Class, the file contains all documents.
    One document contains paragraphs that have several sentences
    It loads the original file and converts the original file to new content
    Then the new content will be saved by this class
  '''
  def __init__(self):
    '''
    Initial the custom file by self
    '''
    self.real_url = ""
    self.encrypt = False
    self.headers = {"User-Agent"'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'}

    """self.proxies = {
        'http': 'http://127.0.0.1:9001',
        'https': 'https://127.0.0.2:9002'
    }"""

  def hello(self):
    print("*" * 50)
    print('m3u8鏈接下載')
    print(' ' * 10 + '適用於非加密 | 加密鏈接')
    print("*" * 50)
    return self
  
  def create_path(selfdownPathsavePathmp4_path):
    try:
      if not os.path.exists("/content/drive/MyDrive/Video/"):
        os.makedirs("/content/drive/MyDrive/Video/")
    
      if not os.path.exists(mp4_path):
        os.makedirs(mp4_path)
      
      if not os.path.exists(downPath):
        os.makedirs(downPath)

      if not os.path.exists(savePath):
        os.makedirs(savePath)
    
    except:
      time.sleep(5)
      self.create_path(downPath, savePath, mp4_path)

  @retry(stop_max_attempt_number=5)
  def request(selfurl):
    '''
    Send a request
    :param url: the url of request
    :param params: the params of request
    :return: the result of request
    '''
    #response = requests.get(url, headers=self.headers, timeout=15, verify=False, stream=True, proxies=self.proxies) 
    
    time.sleep(random.random()/3)
    response = requests.get(url, headers=self.headers, timeout=15
    assert response.status_code == 200
    return response

  def checkUrl(selfurl):
    '''
    Determine if it is a available link of m3u8
    :return: bool
    '''
    if '.m3u8' not in url:
      return False
    elif not url.startswith('http'):
      return False
    else:
      return True

  def parse(selfurl):
    '''
    Analyze a link of m3u8
    :param url: string, the link need to analyze
    :return: list
    '''
    container = list()
    incomplete_m3u8_url = ""
    response = self.request(url).text.split('\n')

    for ts in response:
      if '.ts' in ts:
        container.append(ts)
      
      if '#EXT-X-KEY:' in ts:
        self.encrypt = True
      
      if ".m3u8" in ts:
        incomplete_m3u8_url = ts
      
    if self.encrypt and len(container) !=0:
      print("這個視頻被加密。")

    if len(container) == 0 and incomplete_m3u8_url !="":
      try:
        self.real_url = self.get_real_url(incomplete_m3u8_url, url)
      except Exception as e:
        print(e)
    
    return container
  
  def get_real_url(selfincomplete_m3u8_urlurl):
    if incomplete_m3u8_url[0]!="/":
      incomplete_url = "/" + incomplete_m3u8_url
    else:
      incomplete_url = incomplete_m3u8_url
  
    url_list = url.split("/")
    item = incomplete_url.split("/")[1]
    
    try:
      new_url = "/".join(url_list[:url_list.index(item)])+incomplete_url
    except Exception as e:
      new_url = "/".join(url_list[:-1])+incomplete_url
    
    print("視頻m3u8的網站可能是: \n%s"%(new_url))

    return new_url

  def getEncryptKey(selfurl):
    '''
    Access to the secret key
    :param url: string, Access to the secret key by the url
    :return: string
    '''

    encryptKey = self.request("{}/key.key".format(url)).content
    return encryptKey

  def aesDecode(selfdatakey):
    '''
    Decode the data
    :param data: stream, the data need to decode
    :param key: secret key
    :return: decode the data
    '''
    crypt = AES.new(key, AES.MODE_CBC, key)
    plain_text = crypt.decrypt(data)
    return plain_text.rstrip(b'\0')

  def download(selfqueuesortfiledownPathurlencryptKey,):
    '''
    Download the debris of video
    :param queue: the queue
    :param sort: which number debris
    :param file: the link of debris
    :param downPath: the path to save debris
    :param url: the link of m3u8
    :return: None
    '''
    
    queue.put(file)

    baseUrl = '/'.join(url.split("/")[:-1])

    if self.encrypt:
      self.encryptKey = encryptKey

    if not file.startswith("http"):
      file = baseUrl + '/' +file
    
    debrisName = "{}/{}.ts".format(downPath, sort)

    if not os.path.exists(debrisName):
      response = self.request(file)
      
      with open(debrisName, "wb"as f:
        if self.encrypt:
          data = self.aesDecode(response.content, self.encryptKey)
          f.write(data)
          f.flush()
        
        else:
          f.write(response.content)
          f.flush()

  def progressBar(selfqueuecount):
    '''
    Show progress bar
    :param queue: the queue
    :param count: the number count of debris
    :return: None
    '''
    print('---一共{}個碎片...'.format(count))

    for i in tqdm.tqdm(range(count)):
      file = queue.get()

  def run(selfdownPathsavePathmp4_path):
    '''
    program entry, please input url:
    '''

    while True:                                                    #加密                                        #沒有加密
      url = str(input("請輸入合法的m3u8鏈接\n>>>")) #["https://pps.sd-play.com/20211224/MZHxznS1/index.m3u8","https://m3u.if101.tv/xm3u8/cbea7f06e5ed9e877bbff543d2d2ac1efe377a2d86455ddcde4d69f491abf2499921f11e97d0da21.m3u8"]

      if self.checkUrl(url):
        break
    
    self.create_path(downPath, savePath, mp4_path)

    print('---正在分析鏈接...')
    container = self.parse(url)

    if len(container) == 0:
      try:
        url = self.real_url
        container = self.parse(url)
      except:
        print("---找不到碎片---")
        return None

    baseUrl = '/'.join(url.split("/")[:-1])

    if self.encrypt:
      encryptKey = self.getEncryptKey(baseUrl)
    else:
      encryptKey = None

    print('---鏈接分析成功...')
    print('---進程開始運行...')

    po = multiprocessing.Pool(20)
    queue = multiprocessing.Manager().Queue()
    
    size = 0
    for file in container:
      sort = str(size).zfill(5)
      po.apply_async(self.download, args=(queue, sort, file, downPath, url, encryptKey,))
      size += 1

    po.close()
    self.progressBar(queue, len(container))
    
    print('---進程運行結束...')
    print('---任務下載完成...')

    saveName = time.strftime("%Y%m%d_%H%M%S", time.localtime())

    print('---文件合併中...')

    time.sleep(20)

    os.system("cat {}/*.ts>{}/{}.ts".format(downPath, savePath, saveName))

    print('---文件合併完成---將轉換成mp4...')

    self.convert_ts_to_mp4(savePath, mp4_path, saveName)  
    
    try:
      folder = os.listdir(downPath)
      for file in folder:
        open(os.path.join(downPath, file), "w").close()
      shutil.rmtree(downPath)
    except:
      pass
   
    print(saveName +".mp4")

  def convert_ts_to_mp4(selfsavePathmp4_pathsaveName):
    print("---正在轉換ts影片...,請耐心等待一會兒。")

    file = saveName + ".ts"                         
    infile = os.path.join(savePath, file)
    outfile = os.path.join(mp4_path, file.split(".ts")[0]+".mp4")

    if os.path.getsize(infile) == 0:
      print("---TS文件內容為空---")
      return 
    
    os.system("ffmpeg -i %s -acodec copy -vcodec copy -f mp4 %s"%(infile, outfile))
    time.sleep(2)
    
    try:
      Y = os.listdir(mp4_path).index(file.split(".ts")[0]+".mp4")
      print("---正在刪除舊的ts文件...")
      
      open(infile, "w").close()
      os.remove(infile)
    
    except:
      time.sleep(5)
      self.convert_ts_to_mp4(savePath, mp4_path, saveName)

    print("---轉換完成---")
#=======================================================================================================================================================================================================================================================
global downPath, savePath, mp4_path

downPath = "/content/drive/MyDrive/Video/download_ts_path/"
savePath = "/content/drive/MyDrive/Video/combine_ts_path/"
mp4_path = "/content/drive/MyDrive/Video/Mp4/"

clear_output()

if __name__ == "__main__":
  M3u8().hello().run(downPath, savePath, mp4_path)


« 上一篇 下一篇 »

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。