1. 程式人生 > >使用Appium批量抓取抖音無水印視訊

使用Appium批量抓取抖音無水印視訊

#coding = utf-8
import os
import time
import re
import aiohttp
import asyncio
import requests
import subprocess
from appium import webdriver
from bs4 import BeautifulSoup

class Spider(object):
    def __init__(self,desired_caps = None,douyin_name = None):
        if not desired_caps:
            desired_caps = {}
            desired_caps['platformName'] = 'Android'
            desired_caps['platformVersion'] = '7.1.2'
            desired_caps['deviceName'] = '你的名稱'
            desired_caps['appPackage'] = 'com.ss.android.ugc.aweme'
            desired_caps['appActivity'] ='.main.MainActivity'
            desired_caps['noReset'] = True
        self.desired_caps = desired_caps
        if not douyin_name:
            douyin_name = "doumiaoya1995"
        self.douyin_name = douyin_name
        self.index = 0
        self.video_urls = []
        self.reg = r"(http://v.douyin.com.*?) "
        self.headers = {"user-agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1",}

    def start_appium_clipboard(self):
        self.driver = webdriver.Remote('http://localhost:4723/wd/hub', self.desired_caps)
        time.sleep(5)
        os.popen("adb shell  am startservice ca.zgrs.clipper/.ClipboardService")
        self.driver.tap([(100, 100)], 200)  #點選搜尋符號
        time.sleep(0.5)  #等待載入
        self.driver.tap([(500, 150)], 200)  #點選搜尋框,設定游標
        time.sleep(1)
        #輸入抖音號
        self.driver.find_element_by_xpath("//android.support.v4.view.ViewPager[contains(@index,0)]").send_keys(self.douyin_name)  
        self.driver.find_element_by_id("com.ss.android.ugc.aweme:id/a_x").click()  # 點選搜尋,更新版本後id/a_x 會有變化
        time.sleep(5)  #等待載入
        self.driver.tap([(500, 500)], 200)  #點選進入主頁
        time.sleep(5)  #等待載入
        text = self.driver.find_element_by_id("com.ss.android.ugc.aweme:id/title").get_attribute("text")
        self.total = int(text.split()[-1])   #獲取視訊總數
        self.driver.tap([(100, 1500)], 200)  #點選第一個視訊,進入視訊頁面
        
    def get_url_from_clipboard(self):
        order = "adb shell am broadcast -a clipper.get"
        pi= subprocess.Popen(order,shell=True,stdout=subprocess.PIPE)
        clipboard_text = pi.stdout.read().decode("utf-8")
        url,*_ = re.findall(self.reg,clipboard_text)
        return url

    def get_video_url(self):
        self.start_appium_clipboard()
        while self.index < self.total:
            time.sleep(0.5)
            self.driver.tap([(999, 1499)], 100)   #點選分享按鈕
            time.sleep(1)
            self.driver.swipe(999,1641,20,1641)   #向右滑動,調出複製按鈕
            time.sleep(0.5)
            self.driver.tap([(500, 1700)], 100)   #點選複製按鈕
            url = self.get_url_from_clipboard()
            self.video_urls.append(url)
            os.popen('adb shell am broadcast -a clipper.set -e text ""')  #清除剪下板內容
            time.sleep(0.2)
            self.driver.swipe(500,1500,500,200)
            self.index += 1
        os.popen('adb shell am broadcast -a clipper.set -e text ""')
            
    def get_nowatermark_video_url(self):
        urls = self.video_urls[:]
        self.video_urls = []
        self.index = 1
        for url in urls:
            html = requests.get(url,headers = self.headers).text
            Soup = BeautifulSoup(html,"html.parser")
            url = Soup.select_one("div > video[src]")
            url = url['src']
            url = url.replace("playwm","play")
            self.video_urls.append(url)
            time.sleep(5)
        

    async def get_video_content(self,url):
        async with aiohttp.ClientSession() as session:
            response = await session.get(url,headers=self.headers)
            content = await response.read()
            return content
        
    async def download_video(self,url):
        content = await self.get_video_content(url)
        video_name = "E:\\" + str(self.index) + ".mp4"
        with open(video_name,"wb") as fp:
            fp.write(content)
        print ("第{}個視訊下載成功".format(self.index))
        self.index += 1

    
    def run(self):
        self.get_video_url()
        self.get_nowatermark_video_url()
        print ("準備開始下載")
        start = time.time()
        tasks = [asyncio.ensure_future(self.download_video(url)) for url in self.video_urls]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
        end = time.time()
        print('共運行了%s秒' % (end-start))

if __name__ == "__main__":
    douyin = Spider(douyin_name = "1641028376")
    douyin.run()