Python爬取基於m3u8協議的ts檔案併合並
阿新 • • 發佈:2019-02-18
前言
簡單學習過網路爬蟲,只是之前都是照著書上做併發,大概能理解,卻還是無法自己用到自己專案中,這裡自己研究實現一個網頁嗅探HTML5播放控制元件中基於m3u8協議ts格式視訊資源的專案,並未考慮過複雜情況,畢竟只是練練手.
原始碼
# coding=utf-8
import asyncio
import multiprocessing
import os
import re
import time
from math import floor
from multiprocessing import Manager
import aiohttp
import requests
from lxml import html
import threading
from src.my_lib import retry
from src.my_lib import time_statistics
class M3U8Download:
_path = "./resource\\" # 本地檔案路徑
_url_seed = None # 資源所在連結字首
_target_url = {} # 資源任務目標字典
_mode = ""
_headers = {"User-agent": "Mozilla/5.0"} # 瀏覽器代理
_target_num = 100
def __init__(self):
self._ml = Manager().list() # 程序通訊列表
if not os.path.exists(self._path): # 檢測本地目錄存在否
os.makedirs(self._path)
exec_str = r'chcp 65001'
os.system(exec_str) # 先切換utf-8輸出,防止控制檯亂碼
def sniffing(self, url):
self._url = url
print("開始嗅探..." )
try:
r = requests.get(self._url) # 訪問嗅探網址,獲取網頁資訊
except:
print("嗅探失敗,網址不正確")
os.system("pause")
else:
tree = html.fromstring(r.content)
try:
source_url = tree.xpath('//video//source/@src')[0] # 嗅探資源控制檔案連結,這裡只針對一個資源控制檔案
# self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 從資源控制檔案連結解析域名
except:
print("嗅探失敗,未發現資源")
os.system("pause")
else:
self.analysis(source_url)
def analysis(self, source_url):
try:
self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 從資源控制檔案連結解析域名
with requests.get(source_url) as r: # 訪問資源控制檔案,獲得資源資訊
src = re.split("\n*#.+\n", r.text) # 解析資源資訊
for sub_src in src: # 將資源地址儲存到任務字典
if sub_src:
self._target_url[sub_src] = self._url_seed + "/" + sub_src
except Exception as e:
print("資源無法成功解析", e)
os.system("pause")
else:
self._target_num = len(self._target_url)
print("sniffing success!!!,found", self._target_num, "url.")
self._mode = input(
"1:-> 單程序(Low B)\n2:-> 多程序+多執行緒(網速開始biubiu飛起!)\n3:-> 多程序+協程(最先進的併發!!!)\n")
if self._mode == "1":
for path, url in self._target_url.items():
self._download(path, url)
elif self._mode == "2" or self._mode == "3":
self._multiprocessing()
def _multiprocessing(self, processing_num=4): # 多程序,多執行緒
target_list = {} # 程序任務字典,儲存每個程序分配的任務
pool = multiprocessing.Pool(processes=processing_num) # 開啟程序池
i = 0 # 任務分配標識
for path, url in self._target_url.items(): # 分配程序任務
target_list[path] = url
i += 1
if i % 10 == 0 or i == len(self._target_url): # 每個程序分配十個任務
if self._mode == "2":
pool.apply_async(self._sub_multithreading, kwds=target_list) # 使用多執行緒驅動方法
else:
pool.apply_async(self._sub_coroutine, kwds=target_list) # 使用協程驅動方法
target_list = {}
pool.close() # join函式等待所有子程序結束
pool.join() # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool
while True:
if self._judge_over():
self._combine()
break
def _sub_multithreading(self, **kwargs):
for path, url in kwargs.items(): # 根據程序任務開啟執行緒
t = threading.Thread(target=self._download, args=(path, url,))
t.start()
@retry()
def _download(self, path, url): # 同步下載方法
with requests.get(url, headers=self._headers) as r:
if r.status_code == 200:
with open(self._path + path, "wb")as file:
file.write(r.content)
self._ml.append(0) # 每成功一個就往程序通訊列表增加一個值
percent = '%.2f' % (len(self._ml) / self._target_num * 100)
print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 顯示下載進度
else:
print(path, r.status_code, r.reason)
def _sub_coroutine(self, **kwargs):
tasks = []
for path, url in kwargs.items(): # 根據程序任務建立協程任務列表
tasks.append(asyncio.ensure_future(self._async_download(path, url)))
loop = asyncio.get_event_loop() # 建立非同步事件迴圈
loop.run_until_complete(asyncio.wait(tasks)) # 註冊任務列表
async def _async_download(self, path, url): # 非同步下載方法
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self._headers) as resp:
try:
assert resp.status == 200, "E" # 斷言狀態碼為200,否則拋異常,觸發重試裝飾器
with open(self._path + path, "wb")as file:
file.write(await resp.read())
except Exception as e:
print(e)
else:
self._ml.append(0) # 每成功一個就往程序通訊列表增加一個值
percent = '%.2f' % (len(self._ml) / self._target_num * 100)
print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 顯示下載進度
def _combine(self): # 組合資源方法
try:
print("開始組合資源...")
identification = str(floor(time.time()))
exec_str = r'copy /b "' + self._path + r'*.ts" "' + self._path + 'video' + identification + '.mp4"'
os.system(exec_str) # 使用cmd命令將資源整合
exec_str = r'del "' + self._path + r'*.ts"'
os.system(exec_str) # 刪除原來的檔案
except:
print("資源組合失敗")
else:
print("資源組合成功!")
def _judge_over(self): # 判斷是否全部下載完成
if len(self._ml) == len(self._target_url):
return True
return False
@time_statistics
def app():
multiprocessing.freeze_support()
url = input("輸入嗅探網址:\n")
m3u8 = M3U8Download()
m3u8.sniffing(url)
# m3u8.analysis(url)
if __name__ == "__main__":
app()
這裡是兩個裝飾器的實現:
import time
def time_statistics(fun):
def function_timer(*args, **kwargs):
t0 = time.time()
result = fun(*args, **kwargs)
t1 = time.time()
print("Total time running %s: %s seconds" % (fun.__name__, str(t1 - t0)))
return result
return function_timer
def retry(retries=3):
def _retry(fun):
def wrapper(*args, **kwargs):
for _ in range(retries):
try:
return fun(*args, **kwargs)
except Exception as e:
print("@", fun.__name__, "->", e)
return wrapper
return _retry
打包成exe檔案
使用PyInstaller -F download.py
將程式打包成單個可執行檔案.
這裡需要注意一下,因為程式含有多程序,需要在執行前加一句multiprocessing.freeze_support()
,不然程式會反覆執行多程序前的功能.
關於協程
協程在Python3.5進化到了async await
版本,用 async 標記非同步方法,在非同步方法裡對耗時操作使用await標記.這裡使用了一個程序驅動協程的方法,在程序池建立多個協程任務,使用asyncio.get_event_loop()
建立協程事件迴圈,使用run_until_complete()
註冊協程任務,asyncio.wait()
方法接收一個任務列表進行協程註冊.
關於裝飾器
裝飾器源於閉包原理,這裡使用了兩種裝飾器.
- @time_statistics:統計耗時,裝飾器自己無參型
- @retry():設定重試次數,裝飾器自己有參型
按我理解是有參型是將無參型裝飾器包含在內部,而呼叫是加()
的,關於()
: - 不帶括號時,呼叫的是這個函式本身
- 帶括號(此時必須傳入需要的引數),呼叫的是函式的return結果
關於CMD控制檯
程式會使用CMD命令來將下載的ts檔案合併.
因為CMD預設使用GB2312編碼,呼叫os.system()
需要先切換成通用的UTF-8輸出,否則系統資訊會亂碼.
而且使用cmd命令時引數最好加雙引號,以避免特殊符號報錯.