1. 程式人生 > >強迫症終結版 - 蹩腳梨視訊下載器(很菜勿噴)

強迫症終結版 - 蹩腳梨視訊下載器(很菜勿噴)

強迫症終結版 - 蹩腳梨視訊下載器

支援功能:

  • 自己選擇要下載的視訊分類
  • 視訊個數(12的倍數,最新的...個)(不要怪不能高度自定義啦~)
  • 重複檔案自動跳過
  • 選單會被控制檯列印頂掉,大家見諒(才疏學淺 暫不知道咋改(等視訊都下完了再列印選單))

哎,舒服了~

強調:僅供學習練習用,請不要亂來

# 把檔名搞一下
import re
import os
import requests
from concurrent.futures import ThreadPoolExecutor


class PearDownLoader:
    def __init__(self, page_url, params, headers, cookie=None, video_page_target_str_prefix=None,
                 video_page_target_re=None, video_target_re=None, video_name_re=None, video_directory=None):
        self.headers = headers
        self.cookie = cookie
        self.url = page_url
        self.params = params

        self.video_page_target_str_prefix = video_page_target_str_prefix
        self.video_page_target_re = video_page_target_re
        self.video_target_re = video_target_re
        self.video_name_re = video_name_re
        self.video_directory = video_directory

        self.pool = ThreadPoolExecutor()

    def get_video_page_urls(self):
        res = requests.get(self.url, params=self.params, headers=self.headers)
        video_page_target_str_list = re.findall(self.video_page_target_re, res.text)
        urls_list = [self.video_page_target_str_prefix + video_page_target_str for video_page_target_str in
                     video_page_target_str_list]
        print(urls_list)
        # 至此,已經獲取到了推送的幾條視訊連線地址
        return urls_list

    def get_video_urls(self, video_page_urls):
        videos_list = []
        for video_page_url in video_page_urls:
            res = requests.get(video_page_url, headers=self.headers)
            # 獲取視訊連結
            video_target_url = re.findall(self.video_target_re, res.text)[0]
            # 獲取視訊名稱
            video_name = re.findall(self.video_name_re, res.text)[0]
            video_dic = {
                "video_target_url": video_target_url,
                "video_name": video_name,
            }
            # print("video_dic:", video_dic)
            # 組織成 [{url: "filename"}] 的形式
            videos_list.append(video_dic)
        return videos_list

    def download_videos(self, videos_list):
        # 先判斷有沒有傳引數,沒有給個預設的
        if not self.video_directory:
            self.video_directory = os.path.join(os.path.dirname(__file__), 'pear_download_videos')

        # 按分類創資料夾儲存視訊
        download_category_name = video_category_dic.get(str(self.params.get('categoryId')), 'unknow_category')
        self.video_category_directory = os.path.join(self.video_directory, download_category_name)

        # 判斷該路徑存不存在,不存在建立一下
        if not os.path.exists(self.video_category_directory):
            os.makedirs(self.video_category_directory)

        # 用多執行緒去下載
        for video_dic in videos_list:
            # self.pool.submit(download_video_by_thread, self, video_dic)
            self.pool.submit(self.download_video_by_thread, video_dic)

    def download_video_by_thread(self, video_dic):
        file_name = video_dic.get('video_name', 'video_target_url')
        absolute_file_path = os.path.join(self.video_category_directory, file_name + '.mp4')

        # 得到視訊內容,寫到檔案中
        if os.path.exists(absolute_file_path):
            print(f"【{file_name}】 已存在,跳過下載...")
            return None
        print(f"【{file_name}】 start download ...")

        video_url = video_dic.get('video_target_url')
        res = requests.get(video_url, headers=self.headers)
        with open(absolute_file_path, 'wb') as f:
            for line in res.iter_content():
                f.write(line)

        print(f"【{file_name}】 download compelete...")

    # 一步呼叫,把上面幾個歩鄹合到一起(保留三個歩鄹便於檢視每一步的執行結果,排錯)
    def easy_get_videos(self):
        video_page_urls = self.get_video_page_urls()
        videos_list = self.get_video_urls(video_page_urls=video_page_urls)
        self.download_videos(videos_list=videos_list)


if __name__ == '__main__':
    # 視訊分類
    video_category_dic = {
        "1": "社會",
        "2": "世界",
        "3": "財富",
        "4": "娛樂",
        "5": "生活",
        "6": "美食",
        "8": "科技",
        "9": "體育",
        "10": "新知",
        "31": "汽車",
        "59": "音樂",
        "8889": "旗幟",
    }

    # 準備引數
    url = "https://www.pearvideo.com/category_loading.jsp"
    params = {
        "reqType": 5,  # 字串和數字都要 html 轉碼,區別不大
        "categoryId": 8,  # 梨視訊 科技 頻道
        "start": 0,  # 0 是懶載入的第一個視訊
    }
    headers = {
        "Referer": "https://www.pearvideo.com/category_8",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36",
    }

    video_page_target_str_prefix = "https://www.pearvideo.com/"
    video_page_target_re = '<a href="(.*?)" class="vervideo-lilink actplay">'
    video_target_re = ',srcUrl="(.*?)"'
    video_name_re = '<h1 class="video-tt">(.*?)</h1>'

    video_directory = os.path.join(os.path.dirname(__file__), 'pear_download_videos')

    pear_downloader = PearDownLoader(url, params, headers, video_page_target_str_prefix=video_page_target_str_prefix,
                                     video_page_target_re=video_page_target_re, video_target_re=video_target_re,
                                     video_name_re=video_name_re, video_directory=video_directory)

    while True:
        while True:
            print("現有如下視訊分類可選擇:")
            for key, value in video_category_dic.items():
                print(f"{key}.{value}", end='\t')

            want_video_type = input("\n請輸入您要下載的視訊型別編號:").strip()
            if want_video_type in video_category_dic:
                want_video_category_id = int(want_video_type)
                break
            print("您輸入的編號有誤,請重新輸入!")

        while True:
            want_video_pear_count = input("請輸入您要下載的視訊數量:12 * ").strip()
            if want_video_pear_count.isdigit():
                want_video_pear_count = int(want_video_pear_count)
                break
            print("請輸入合法數字!")

        # 開始分批下載
        try:
            for i in range(want_video_pear_count):
                pear_downloader.params['categoryId'] = want_video_category_id
                pear_downloader.params['start'] = pear_downloader.params['start'] + 12
                pear_downloader.easy_get_videos()
        except Exception as e:
            pear_downloader.pool.shutdown()  # 關閉池子且等待池子中所有的任務執行完畢
            print(e)