1. 程式人生 > >Java 多執行緒爬蟲及分散式爬蟲架構探索

Java 多執行緒爬蟲及分散式爬蟲架構探索

這是 Java 爬蟲系列博文的第五篇,在上一篇 Java 爬蟲伺服器被遮蔽,不要慌,咱們換一臺伺服器 中,我們簡單的聊反爬蟲策略和反反爬蟲方法,主要針對的是 IP 被封及其對應辦法。前面幾篇文章我們把爬蟲相關的基本知識都講的差不多啦。這一篇我們來聊一聊爬蟲架構相關的內容。

前面幾章內容我們的爬蟲程式都是單執行緒,在我們除錯爬蟲程式的時候,單執行緒爬蟲沒什麼問題,但是當我們在線上環境使用單執行緒爬蟲程式去採集網頁時,單執行緒就暴露出了兩個致命的問題:

  • 採集效率特別慢,單執行緒之間都是序列的,下一個執行動作需要等上一個執行完才能執行
  • 對伺服器的CUP等利用率不高,想想我們的伺服器都是 8核16G,32G 的只跑一個執行緒會不會太浪費啦

線上環境不可能像我們本地測試一樣,不在乎採集效率,只要能正確提取結果就行。在這個時間就是金錢的年代,不可能給你時間去慢慢的採集,所以單執行緒爬蟲程式是行不通的,我們需要將單執行緒改成多執行緒的模式,來提升採集效率和提高計算機利用率。

多執行緒的爬蟲程式設計比單執行緒就要複雜很多,但是與其他業務在高併發下要保證資料安全又不同,多執行緒爬蟲在資料安全上到要求不是那麼的高,因為每個頁面都可以被看作是一個獨立體。要做好多執行緒爬蟲就必須做好兩點:第一點就是統一的待採集 URL 維護,第二點就是 URL 的去重, 下面我們簡單的來聊一聊這兩點。

維護待採集的 URL

多執行緒爬蟲程式就不能像單執行緒那樣,每個執行緒獨自維護這自己的待採集 URL,如果這樣的話,那麼每個執行緒採集的網頁將是一樣的,你這就不是多執行緒採集啦,你這是將一個頁面採集的多次。基於這個原因我們就需要將待採集的 URL 統一維護,每個執行緒從統一 URL 維護處領取採集 URL ,完成採集任務,如果在頁面上發現新的 URL 連結則新增到 統一 URL 維護的容器中。下面是幾種適合用作統一 URL 維護的容器:

  • JDK 的安全佇列,例如 LinkedBlockingQueue
  • 高效能的 NoSQL,比如 Redis、Mongodb
  • MQ 訊息中介軟體

URL 的去重

URL 的去重也是多執行緒採集的關鍵一步,因為如果不去重的話,那麼我們將採集到大量重複的 URL,這樣並沒有提升我們的採集效率,比如一個分頁的新聞列表,我們在採集第一頁的時候可以得到 2、3、4、5 頁的連結,在採集第二頁的時候又會得到 1、3、4、5 頁的連結,待採集的 URL 佇列中將存在大量的列表頁連結,這樣就會重複採集甚至進入到一個死迴圈當中,所以就需要 URL 去重。URL 去重的方法就非常多啦,下面是幾種常用的 URL 去重方式:

  • 將 URL 儲存到資料庫進行去重,比如 redis、MongoDB
  • 將 URL 放到雜湊表中去重,例如 hashset
  • 將 URL 經過 MD5 之後儲存到雜湊表中去重,相比於上面一種,能夠節約空間
  • 使用 布隆過濾器(Bloom Filter)去重,這種方式能夠節約大量的空間,就是不那麼準確。

關於多執行緒爬蟲的兩個核心知識點我們都知道啦,下面我畫了一個簡單的多執行緒爬蟲架構圖,如下圖所示:

上面我們主要了解了多執行緒爬蟲的架構設計,接下來我們不妨來試試 Java 多執行緒爬蟲,我們以採集虎撲新聞為例來實戰一下 Java 多執行緒爬蟲,Java 多執行緒爬蟲中設計到了 待採集 URL 的維護和 URL 去重,由於我們這裡只是演示,所以我們就使用 JDK 內建的容器來完成,我們使用 LinkedBlockingQueue 作為待採集 URL 維護容器,HashSet 作為 URL 去重容器。下面是 Java 多執行緒爬蟲核心程式碼,詳細程式碼以上傳 GitHub,地址在文末:

/**
 * 多執行緒爬蟲
 */
public class ThreadCrawler implements Runnable {
    // 採集的文章數
    private final AtomicLong pageCount = new AtomicLong(0);
    // 列表頁連結正則表示式
    public static final String URL_LIST = "https://voice.hupu.com/nba";
    protected Logger logger = LoggerFactory.getLogger(getClass());
    // 待採集的佇列
    LinkedBlockingQueue<String> taskQueue;
    // 採集過的連結列表
    HashSet<String> visited;
    // 執行緒池
    CountableThreadPool threadPool;
    /**
     *
     * @param url 起始頁
     * @param threadNum 執行緒數
     * @throws InterruptedException
     */
    public ThreadCrawler(String url, int threadNum) throws InterruptedException {
        this.taskQueue = new LinkedBlockingQueue<>();
        this.threadPool = new CountableThreadPool(threadNum);
        this.visited = new HashSet<>();
        // 將起始頁新增到待採集佇列中
        this.taskQueue.put(url);
    }

    @Override
    public void run() {
        logger.info("Spider started!");
        while (!Thread.currentThread().isInterrupted()) {
            // 從佇列中獲取待採集 URL
            final String request = taskQueue.poll();
            // 如果獲取 request 為空,並且當前的執行緒採已經沒有執行緒在執行
            if (request == null) {
                if (threadPool.getThreadAlive() == 0) {
                    break;
                }
            } else {
                // 執行採集任務
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processRequest(request);
                        } catch (Exception e) {
                            logger.error("process request " + request + " error", e);
                        } finally {
                            // 採集頁面 +1
                            pageCount.incrementAndGet();
                        }
                    }
                });
            }
        }
        threadPool.shutdown();
        logger.info("Spider closed! {} pages downloaded.", pageCount.get());
    }

    /**
     * 處理採集請求
     * @param url
     */
    protected void processRequest(String url) {
        // 判斷是否為列表頁
        if (url.matches(URL_LIST)) {
            // 列表頁解析出詳情頁連結新增到待採集URL佇列中
            processTaskQueue(url);
        } else {
            // 解析網頁
            processPage(url);
        }
    }
    /**
     * 處理連結採集
     * 處理列表頁,將 url 新增到佇列中
     *
     * @param url
     */
    protected void processTaskQueue(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            // 詳情頁連結
            Elements elements = doc.select(" div.news-list > ul > li > div.list-hd > h4 > a");
            elements.stream().forEach((element -> {
                String request = element.attr("href");
                // 判斷該連結是否存在佇列或者已採集的 set 中,不存在則新增到佇列中
                if (!visited.contains(request) && !taskQueue.contains(request)) {
                    try {
                        taskQueue.put(request);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }));
            // 列表頁連結
            Elements list_urls = doc.select("div.voice-paging > a");
            list_urls.stream().forEach((element -> {
                String request = element.absUrl("href");
                // 判斷是否符合要提取的列表連結要求
                if (request.matches(URL_LIST)) {
                    // 判斷該連結是否存在佇列或者已採集的 set 中,不存在則新增到佇列中
                    if (!visited.contains(request) && !taskQueue.contains(request)) {
                        try {
                            taskQueue.put(request);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 解析頁面
     *
     * @param url
     */
    protected void processPage(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            String title = doc.select("body > div.hp-wrap > div.voice-main > div.artical-title > h1").first().ownText();

            System.out.println(Thread.currentThread().getName() + " 在 " + new Date() + " 採集了虎撲新聞 " + title);
            // 將採集完的 url 存入到已經採集的 set 中
            visited.add(url);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {
            new ThreadCrawler("https://voice.hupu.com/nba", 5).run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

我們用 5 個執行緒去採集虎撲新聞列表頁看看效果如果?執行該程式,得到如下結果:

結果中可以看出,我們啟動了 5 個執行緒採集了 61 頁頁面,一共耗時 2 秒鐘,可以說效果還是不錯的,我們來跟單執行緒對比一下,看看差距有多大?我們將執行緒數設定為 1 ,再次啟動程式,得到如下結果:

可以看出單執行緒採集虎撲 61 條新聞花費了 7 秒鐘,耗時差不多是多執行緒的 4 倍,你想想這可只是 61 個頁面,頁面更多的話,差距會越來越大,所以多執行緒爬蟲效率還是非常高的。

分散式爬蟲架構

分散式爬蟲架構是一個大型採集程式才需要使用的架構,一般情況下使用單機多執行緒就可以解決業務需求,反正我是沒有分散式爬蟲專案的經驗,所以這一塊我也沒什麼可以講的,但是我們作為技術人員,我們需要對技術儲存熱度,雖然不用,但是瞭解瞭解也無妨,我查閱了不少資料得出瞭如下結論:

分散式爬蟲架構跟我們多執行緒爬蟲架構在思路上來說是一樣的,我們只需要在多執行緒的基礎上稍加改進就可以變成一個簡單的分散式爬蟲架構。因為分散式爬蟲架構中爬蟲程式部署在不同的機器上,所以我們待採集的 URL 和 採集過的 URL 就不能存放在爬蟲程式機器的記憶體中啦,我們需要將它統一在某臺機器上維護啦,比如存放在 Redis 或者 MongoDB 中,每臺機器都從這上面獲取採集連結,而不是從 LinkedBlockingQueue 這樣的記憶體佇列中取連結啦,這樣一個簡單的分散式爬蟲架構就出現了,當然這裡面還會有很多細節問題,因為我沒有分散式架構的經驗,我也無從說起,如果你有興趣的話,歡迎交流。

原始碼:原始碼

文章不足之處,望大家多多指點,共同學習,共同進步

最後

打個小廣告,歡迎掃碼關注微信公眾號:「平頭哥的技術博文」,一起進步吧。

相關推薦

Java執行-基礎實現

1. 什麼是執行緒 執行緒是程序內的執行單元     某個程序當中都有若干個執行緒。 執行緒是程序內的執行單元。 使用執行緒的原因是,程序的切換是非常重量級

Java執行原理Thread類的使用

一、程序與執行緒的區別 1.程序是應用程式在記憶體總分配的空間。(正在執行中的程式) 2.執行緒是程序中負責程式執行的執行單元、執行路徑。 3.一個程序中至少有一個執行緒在負責程序的執行。 4.一個程序中有多個執行緒在執行的程式,為多執行緒程式。 5.多執行緒技術是為了解決多部分程式碼同時執行。

Day 9——java執行2字元編碼集合

java.lang.Runnable 介面 Runnable中有Public void run();方法 供現有Runnable物件建立執行緒 使用Runnable物件建立執行緒 New Thread(Runnable r).start(); 靜態同步方法

java執行併發執行

執行緒的常用建立方式 1、繼承Thread類建立執行緒類 public class FirstThreadTest extends Thread { public void run(){ System.out.println("這

Java執行——物件變數的併發訪問

Java多線系列文章是Java多執行緒的詳解介紹,對多執行緒還不熟悉的同學可以先去看一下我的這篇部落格Java基礎系列3:多執行緒超詳細總結,這篇部落格從巨集觀層面介紹了多執行緒的整體概況,接下來的幾篇文章是對多執行緒的深入剖析。   本篇文章主要介紹Java多執行緒中的同步,也就是如何在Java語

Java 執行爬蟲分散式爬蟲架構探索

這是 Java 爬蟲系列博文的第五篇,在上一篇 Java 爬蟲伺服器被遮蔽,不要慌,咱們換一臺伺服器 中,我們簡單的聊反爬蟲策略和反反爬蟲方法,主要針對的是 IP 被封及其對應辦法。前面幾篇文章我們把爬蟲相關的基本知識都講的差不多啦。這一篇我們來聊一聊爬蟲架構相關的內容。 前面幾章內容我們的爬蟲程式都是單執行

java執行爬蟲框架crawler4j的使用

一開始找jar包找了好久都沒找到,後來花了6個積分把所有的依賴包找到了,現在放在百度雲供大家免費下載: 連結:https://pan.baidu.com/s/12MTMy4d4e6hZsmWAdXbUMQ 提取碼:433g 注意這些依賴包是3.5版本的不是最新版本。 如果想使用最新版本的

[原創]一款小巧、靈活的Java執行爬蟲框架(AiPa)

1.作品簡介 AiPa 是一款小巧,靈活,擴充套件性高的多執行緒爬蟲框架。 AiPa 依賴當下最簡單的HTML解析器Jsoup。 AiPa 只需要使用者提供網址集合,即可在多執行緒下自動爬取,並對一些異常進行處理。 2.下載安裝 AiPa是一個小巧的、只有390KB的jar包。 下載該Jar包匯入到你的專案中

AiPa — 小巧、靈活的 Java 執行爬蟲框架

1.框架簡介 AiPa 是一款小巧,靈活,擴充套件性高的多執行緒爬蟲框架。 AiPa 依賴當下最簡單的HTML解析器Jsoup。 AiPa 只需要使用者提供網址集合,即可在多執行緒下自動爬取,並對一些異常進行處理。 2.下載安裝 AiPa是一個小巧的、只有390KB

網路採集器Demo:Jsoup+Java執行實現[爬蟲](上)

裡面最簡單,但是很常用的一個部分,就是網路爬蟲,從網頁上獲取文字資訊 這裡用到兩個工具,一個就是Java多執行緒(基於Java5 以上的執行緒池模式,區別於過時的Runable),另外一個是一個小工具:Jsoup,用於解析html網頁,獲取其中的內容,關於Jsoup的使用

Java 執行面試題答案(非常全面)

這篇文章主要是對多執行緒的問題進行總結的,因此羅列了40個多執行緒的問題。 這些多執行緒的問題,有些來源於各大網站、有些來源於自己的思考。可能有些問題網上有、可能有些問題對應的答案也有、也可能有些各位網友也都看過,但是本文寫作的重心就是所有的問題都會按照自己的理解回答一遍,不會去看網上的

java執行、FutureTask的用法兩種常用的使用場景

Java多執行緒實現的方式有四種 1.繼承Thread類,重寫run方法 2.實現Runnable介面,重寫run方法,實現Runnable介面的實現類的例項物件作為Thread建構函式的target 3.通過Callable和FutureTask建立執行緒 4.通過執行緒池

java:執行(實現Runnable的原理)二種方式的區別

* 1,看Thread類的建構函式,傳遞了Runnable介面的引用  * 2,通過init()方法找到傳遞的target給成員變數的target賦值 * 3,檢視run方法,發現run方法中有判斷,如果target不為null就會呼叫Runnable介面子類物件的run方法 *

Java執行程式設計核心技術(二)物件變數的併發訪問

最近一直在忙比賽,四五個吧,時間有點緊張,部落格也沒時間更新~ 只能忙裡抽閒 本文屬於Java多執行緒程式設計系列的第二篇,旨在分享我對多執行緒程式設計技術的心得與感悟,順便做下筆記。 如果你閱讀完比較感興趣,歡迎關注我,等待更新後續篇章。 本文主要介紹Java多執行緒中的同步,也就是如何在Java語言中

Java執行(一):執行基礎建立

(一)、執行緒的生命週期 新建狀態: 使用 new 關鍵字和 Thread 類或其子類建立一個執行緒物件後,該執行緒物件就處於新建狀態。它保持這個狀態直到程式 start() 這個執行緒。 就緒狀態: 當執行緒物件呼叫了start()方法之後,該執行緒就進入就緒

Java執行-----執行池的使用,原理以及舉例實現(三)(四):使用樣例如何配置執行池大小

三.使用示例   前面我們討論了關於執行緒池的實現原理,這一節我們來看一下它的具體使用: public class Test { public static void main(String[] args) { ThreadPoolExe

java執行 ThreadPoolExecutor 策略

無論是使用jdk的執行緒池ThreadPoolExecutor 還是spring的執行緒池ThreadPoolTaskExecutor 都會使用到一個阻塞佇列來進行儲存執行緒任務。    當執行緒不夠用時,則將後續的任務暫存到 阻塞佇列中,等待有空閒執行緒來進行。   當

執行+代理ip池 爬蟲

# coding=utf-8 import tushare as ts import pandas as pd import requests import json import re import time from retrying import retry from concurren

java定時器類Timer和執行介紹例項

任務要求: 完成一個java application應用程式,使用定時器程式設計,在實時顯示當前時間,每1秒時鐘內容更新一次。 完成一個java application應用程式,在應用程式主程序中新開一個執行緒,此執行緒進行死迴圈,每1秒被啟用一次,啟用時即在

Java 執行(三)—— 執行的生命週期方法

這篇部落格介紹執行緒的生命週期。   執行緒是一個動態執行的過程,它也有從建立到死亡的過程。 執行緒的幾種狀態 在 Thread 類中,有一個列舉內部類: 上面的資訊以圖片表示如下:   第一張圖:  第二張圖:把等待、計時等待、阻塞看成阻塞一個狀態了 1、新建狀態(ne