1. 程式人生 > >《C# 爬蟲 破境之道》:第二境 爬蟲應用 — 第七節:併發控制與策略

《C# 爬蟲 破境之道》:第二境 爬蟲應用 — 第七節:併發控制與策略

我們在第五節中提到一個問題,任務佇列增長速度太快,與之對應的採集、分析、處理速度遠遠跟不上,造成記憶體快速增長,頻寬佔用過高,CPU使用率過高,這樣是極度有害系統健康的。

我們在開發採集程式的時候,總是希望能夠儘快將資料爬取下來,如果總任務數量很小(2~3K請求數之內),總耗費時長很短(1~2分鐘之內),那麼,對系統的正常執行不會造成太嚴重的影響,我們儘可以肆無忌憚。但,當總任務數量更多,總耗費時長更長,那麼,無休止的任務堆積,就會給系統帶來難以預料甚至是很嚴重的後果。

為此,我們不得不考慮幾個問題:

  • 我們的任務總量大概在什麼量級,全速採集大概需要耗費多少時間、多少資源,未來的發展是不是可控?
  • 採集系統自身依託的環境資源是否充足,是否能夠滿足隨之而來的巨大的資源消耗?
  • 採集的目標資源系統是否具有某些反爬策略限制?
  • 採集的目標資源系統是否能夠承受得住如此數量級的併發採集請求(無論單點或分散式採集系統,都要考慮這點)?
  • 隨著採集結果返回,帶來的後續分析、處理、儲存能力是否能夠滿足大量資料的瞬時到來?

由以上問題也可以看出,一個爬蟲系統策略的制定,需要考慮的問題也是全方位的,而不僅僅是採集本身,不同的環境、規模、目標,採用的策略也不盡相同。本節,我們將討論一下,如果我們的能力不能滿足上述條件的情況下,如何來制定一個併發策略以及如何實現它。

併發策略,從規模上可以分為全域性併發策略和單點併發策略,全域性併發策略包含單點併發策略,不過它也需要同時考慮負載均衡策略對制定併發策略的影響。目前,我們還沒有將爬蟲框架擴充套件到分散式框架,暫時先不考慮全域性併發策略的制定。主要探討一下單點併發策略制定與實現。

單點併發策略的制定:

通常,我們在制定單點併發策略時,需要從哪些角度考慮,使用什麼方法,以及如何決策?下面我們就來詳細聊聊:)

1、我們先來梳理一下采集系統自身所依託的環境資源:

除了CPU、記憶體、儲存器、頻寬這些耳熟能詳的資源外,還有就是比較容易被忽略的作業系統可用埠。對於各種資源的佔用情況,下面給出一些建議(均值):

  • CPU:採集系統的佔用總量建議不超過30%,CPU總使用量建議不超過50%。(雖然我這個瘋子經常貪婪過渡T_T)。對於多核CPU,執行緒建立數量建議不超過CPU核數的兩倍。
  • 記憶體:採集系統的佔用總量建議不超過50%,記憶體總使用量建議不超過70%。
  • 儲存器:對於商業或者大規模的爬蟲體系,建議將儲存分離,使用外部儲存裝置,比如NAS、分散式快取、資料倉庫等;當然,其他爬蟲體系也這麼建議,但如果條件不允許的話,只能儲存在本地磁碟的話,就需要考慮磁碟的IOPS了,即使是使用快取、資料庫系統來作為中間儲存媒介,實質上也是與磁碟IO打交道,不過一般的快取、資料庫系統都會對IO做優化,而且能干預的力度比較小,倒是可以略微“省心”。這個,本人也無法給出一個合理的通用的建議值,磁碟的效能千奇百怪,只能是按實際環境來拿捏了。
  • 頻寬:分為上行、下行兩個頻寬指標,採集系統在這兩個指標中的佔用總量都不建議超過80%。除了考慮ISP分配的頻寬,還要考慮會影響其效能的周邊裝置,比如貓、交換機、路由器甚至是網線的吞吐能力。說來尷尬,我經常在家裡做實驗,爬蟲系統和目標資源系統都還OK,聯通的光貓跪了……重啟復活……又跪了……重啟復活……又跪了……重啟復活……
  • 可用埠:這個是一個隱性條件,也是經常被忽略的限制。拿Windows系統來說,可用的埠最大數量為UInt16.MaxValue(65535)個,而伴隨著系統啟動,就會有一系列的服務佔用了部分埠,比如IIS中的網站、資料庫、QQ,而系統本身也會保留一部分埠,比如443、3389等。而是否能夠使用埠重用技術來緩解疼痛,對具體實現以及NAS埠對映規則的要求更高,不好或不可控。所以爬蟲本身能夠使用的埠數就有一個極限限制,這個也沒有建議值,具體情況各不相同。

總之,資源總是有限的,大體原則就是:做人留一線,日後好相見:)

2、對於目標資源系統的資源環境:

通常,我們無法探知具體的資源情況,再加上對方可能使用反爬策略,就是知道具體的資源情況,也不見得就有用。對於制定併發策略,我們更關心的是對方能夠吃的下多大的鴨梨,以及探索其反爬策略允許的極限。為此,我們需要使用下述的方法,來輔助我們制定策略。

3、通常使用的方法:

3.1、需要找到目標資源系統中的一個URI,原則是輕量、成功率高(最好是100%),比如,一張小小的圖片、一個簡單短小的ajax介面、一個靜態html甚至是一個xxx.min.css,但要注意,我們選取的URI可千萬不要是經過CDN加速的,否則T_T;

3.2、接下來,我們就針對選取的URI進行週期採集,對於一般的資源站點,初始頻率設定為1秒1次,就可以了;

3.3、然後就是執行一段時間觀察結果,後面我們再說執行多長時間觀察為合適;

3.4、如果觀察結果OK,成功率能夠達到95%+,那麼,我們就可以適量縮小採集週期,反之,就要適當延長採集週期;

3.5、重複3.3~3.4,最後得到一個合理的極限週期;

3.6、至於一次觀察多長時間,不同的反爬策略,有著不同的限制,這個需要小心。我曾經的一個專案,當時就比較心急,觀測了5分鐘,沒什麼問題,就丟出去了,結果後來現實告訴我,他們的策略是1分鐘累計限制、10分鐘累計限制、20分鐘累計限制、30分鐘累計限制、1小時累計限制……而且累計限制逐級遞減,也就是說,你滿足了1分鐘的累計限制,x10,就不一定滿足10分鐘的累計限制,x60就有可能遠遠超出了1小時累計限制。這裡給出一個建議,至少30分鐘。因為目標系統去統計每一個來源IP的訪問週期,也是一個不小的代價,所以也不可能做到無限期的監測,通常半小時到一小時已經是極限了。這裡也給出一個最保險的觀測週期,那就是根據請求總量及當前頻率,預估耗費總時長,作為觀測週期,這樣是最穩妥的,但,這也可能是不切實際的:(

4、如何制定併發策略:

通過上述3步,結合自身的資源情況、目標的反爬策略及承受能力、以及觀測結果,我們就可以制定一個大概的併發量了,制定決策也就不那麼困難了;

我們的任務都是儲存在佇列中,併發的限制,無非就是控制入隊的頻率,所以,只需要把前面的統計結果轉化為最小請求間隔,就是我們最終的併發策略了;

為什麼是控制入隊,而不是出隊呢?因為如果不控制入隊,那麼佇列還是會無限暴增,直至“死亡”,而限制入隊,一方面避免佇列暴增,另一方面,阻塞新任務的生成,降低CPU及記憶體使用量;

單點併發策略的實現:

有了理論基礎,在技術實現上,就不是什麼難事兒了。

 1 namespace MikeWare.Core.Components.CrawlerFramework.Policies
 2 {
 3     using System;
 4 
 5     public abstract class AConcurrentPolicy
 6     {
 7         public virtual bool WaitOne(TimeSpan timeout) => throw new NotImplementedException();
 8 
 9         public virtual void ReleaseOne() => throw new NotImplementedException();
10     }
11 }
併發策略 —— AConcurrentPolicy

這是一個抽象類,具有兩個抽象方法,作為併發策略的基礎實現;

我寫了兩種併發策略的具體實現,PeriodConcurrentPolicy和SemaphoreConcurrentPolicy,他們的目的都是用來控制入隊的頻率,目標一致,方法不同,您也可以實現自己的併發策略;

本節,我們主要說道說道System.Threading.Semaphore的使用及SemaphoreConcurrentPolicy的實現原理;

 1 namespace MikeWare.Core.Components.CrawlerFramework.Policies
 2 {
 3     using System;
 4     using System.Threading;
 5 
 6     public class SemaphoreConcurrentPolicy : AConcurrentPolicy
 7     {
 8         private Semaphore semaphore = null;
 9 
10         public SemaphoreConcurrentPolicy(int init, int max)
11         {
12             semaphore = new Semaphore(init, max);
13         }
14 
15         public override bool WaitOne(TimeSpan timeout)
16         {
17             return semaphore.WaitOne(timeout);
18         }
19 
20         public override void ReleaseOne()
21         {
22             semaphore.Release(1);
23         }
24     }
25 }
併發策略實現 —— SemaphoreConcurrentPolicy

SemaphoreConcurrentPolicy繼承自AConcurrentPolicy,定義了一個私有變數Semaphore semaphore,以及重寫了基類的兩個抽象方法;

namespace System.Threading
{
    //
    // Summary:
    //     Limits the number of threads that can access a resource or pool of resources
    //     concurrently.
    public sealed class Semaphore : WaitHandle
    {
        //
        // Summary:
        //     Initializes a new instance of the System.Threading.Semaphore class, specifying
        //     the initial number of entries and the maximum number of concurrent entries.
        //
        // Parameters:
        //   initialCount:
        //     The initial number of requests for the semaphore that can be granted concurrently.
        //
        //   maximumCount:
        //     The maximum number of requests for the semaphore that can be granted concurrently.
        //
        // Exceptions:
        //   T:System.ArgumentException:
        //     initialCount is greater than maximumCount.
        //
        //   T:System.ArgumentOutOfRangeException:
        //     maximumCount is less than 1. -or- initialCount is less than 0.
        public Semaphore(int initialCount, int maximumCount);
        //
        // Summary:
        //     Initializes a new instance of the System.Threading.Semaphore class, specifying
        //     the initial number of entries and the maximum number of concurrent entries, and
        //     optionally specifying the name of a system semaphore object.
        //
        // Parameters:
        //   initialCount:
        //     The initial number of requests for the semaphore that can be granted concurrently.
        //
        //   maximumCount:
        //     The maximum number of requests for the semaphore that can be granted concurrently.
        //
        //   name:
        //     The name of a named system semaphore object.
        //
        // Exceptions:
        //   T:System.ArgumentException:
        //     initialCount is greater than maximumCount. -or- name is longer than 260 characters.
        //
        //   T:System.ArgumentOutOfRangeException:
        //     maximumCount is less than 1. -or- initialCount is less than 0.
        //
        //   T:System.IO.IOException:
        //     A Win32 error occurred.
        //
        //   T:System.UnauthorizedAccessException:
        //     The named semaphore exists and has access control security, and the user does
        //     not have System.Security.AccessControl.SemaphoreRights.FullControl.
        //
        //   T:System.Threading.WaitHandleCannotBeOpenedException:
        //     The named semaphore cannot be created, perhaps because a wait handle of a different
        //     type has the same name.
        public Semaphore(int initialCount, int maximumCount, string name);
        //
        // Summary:
        //     Initializes a new instance of the System.Threading.Semaphore class, specifying
        //     the initial number of entries and the maximum number of concurrent entries, optionally
        //     specifying the name of a system semaphore object, and specifying a variable that
        //     receives a value indicating whether a new system semaphore was created.
        //
        // Parameters:
        //   initialCount:
        //     The initial number of requests for the semaphore that can be satisfied concurrently.
        //
        //   maximumCount:
        //     The maximum number of requests for the semaphore that can be satisfied concurrently.
        //
        //   name:
        //     The name of a named system semaphore object.
        //
        //   createdNew:
        //     When this method returns, contains true if a local semaphore was created (that
        //     is, if name is null or an empty string) or if the specified named system semaphore
        //     was created; false if the specified named system semaphore already existed. This
        //     parameter is passed uninitialized.
        //
        // Exceptions:
        //   T:System.ArgumentException:
        //     initialCount is greater than maximumCount. -or- name is longer than 260 characters.
        //
        //   T:System.ArgumentOutOfRangeException:
        //     maximumCount is less than 1. -or- initialCount is less than 0.
        //
        //   T:System.IO.IOException:
        //     A Win32 error occurred.
        //
        //   T:System.UnauthorizedAccessException:
        //     The named semaphore exists and has access control security, and the user does
        //     not have System.Security.AccessControl.SemaphoreRights.FullControl.
        //
        //   T:System.Threading.WaitHandleCannotBeOpenedException:
        //     The named semaphore cannot be created, perhaps because a wait handle of a different
        //     type has the same name.
        public Semaphore(int initialCount, int maximumCount, string name, out bool createdNew);

        //
        // Summary:
        //     Opens the specified named semaphore, if it already exists.
        //
        // Parameters:
        //   name:
        //     The name of the system semaphore to open.
        //
        // Returns:
        //     An object that represents the named system semaphore.
        //
        // Exceptions:
        //   T:System.ArgumentException:
        //     name is an empty string. -or- name is longer than 260 characters.
        //
        //   T:System.ArgumentNullException:
        //     name is null.
        //
        //   T:System.Threading.WaitHandleCannotBeOpenedException:
        //     The named semaphore does not exist.
        //
        //   T:System.IO.IOException:
        //     A Win32 error occurred.
        //
        //   T:System.UnauthorizedAccessException:
        //     The named semaphore exists, but the user does not have the security access required
        //     to use it.
        public static Semaphore OpenExisting(string name);
        //
        // Summary:
        //     Opens the specified named semaphore, if it already exists, and returns a value
        //     that indicates whether the operation succeeded.
        //
        // Parameters:
        //   name:
        //     The name of the system semaphore to open.
        //
        //   result:
        //     When this method returns, contains a System.Threading.Semaphore object that represents
        //     the named semaphore if the call succeeded, or null if the call failed. This parameter
        //     is treated as uninitialized.
        //
        // Returns:
        //     true if the named semaphore was opened successfully; otherwise, false.
        //
        // Exceptions:
        //   T:System.ArgumentException:
        //     name is an empty string. -or- name is longer than 260 characters.
        //
        //   T:System.ArgumentNullException:
        //     name is null.
        //
        //   T:System.IO.IOException:
        //     A Win32 error occurred.
        //
        //   T:System.UnauthorizedAccessException:
        //     The named semaphore exists, but the user does not have the security access required
        //     to use it.
        public static bool TryOpenExisting(string name, out Semaphore result);
        //
        // Summary:
        //     Exits the semaphore and returns the previous count.
        //
        // Returns:
        //     The count on the semaphore before the System.Threading.Semaphore.Release* method
        //     was called.
        //
        // Exceptions:
        //   T:System.Threading.SemaphoreFullException:
        //     The semaphore count is already at the maximum value.
        //
        //   T:System.IO.IOException:
        //     A Win32 error occurred with a named semaphore.
        //
        //   T:System.UnauthorizedAccessException:
        //     The current semaphore represents a named system semaphore, but the user does
        //     not have System.Security.AccessControl.SemaphoreRights.Modify. -or- The current
        //     semaphore represents a named system semaphore, but it was not opened with System.Security.AccessControl.SemaphoreRights.Modify.
        public int Release();
        //
        // Summary:
        //     Exits the semaphore a specified number of times and returns the previous count.
        //
        // Parameters:
        //   releaseCount:
        //     The number of times to exit the semaphore.
        //
        // Returns:
        //     The count on the semaphore before the System.Threading.Semaphore.Release* method
        //     was called.
        //
        // Exceptions:
        //   T:System.ArgumentOutOfRangeException:
        //     releaseCount is less than 1.
        //
        //   T:System.Threading.SemaphoreFullException:
        //     The semaphore count is already at the maximum value.
        //
        //   T:System.IO.IOException:
        //     A Win32 error occurred with a named semaphore.
        //
        //   T:System.UnauthorizedAccessException:
        //     The current semaphore represents a named system semaphore, but the user does
        //     not have System.Security.AccessControl.SemaphoreRights.Modify rights. -or- The
        //     current semaphore represents a named system semaphore, but it was not opened
        //     with System.Security.AccessControl.SemaphoreRights.Modify rights.
        public int Release(int releaseCount);
    }
}
System.Threading.Semaphore

看它的summary,我們大體瞭解這個類就是專門用來做併發限制的,它具有三個建構函式,我們最關心的,就是其中兩個引數int initialCount, int maximumCount及其涵義;

initialCount:能夠被Semaphore 授予的數量的初始值;

maximumCount:能夠被Semaphore 授予的最大值;

字面意思可能不太好理解,我們來把官宣翻譯成普通話:)

舉個栗子,我們把Semaphore看成是一個用來裝鑰匙的盒子,每一個想要進入佇列這道“門”的任務,都需要先從盒子裡取一把鑰匙,才能進入;initialCount,就是說,這個盒子,一開始的時候,放幾把鑰匙,但是進入佇列的任務,時時不肯出來,不歸還鑰匙,無鑰匙可用,這時管理員就決定再多配一些鑰匙,以備用,於是,一些新鑰匙又被放入盒子裡,但盒子的容積有限,一共能容納多少把鑰匙,就是maximumCount了。

當然,我們常見的情況是構造盒子的時候,initialCount == maximumCount,特殊場景下,會設定不相同,這個視具體業務而定。然而,maximumCount不能小於initialCount,initialCount不能小於0,這個是硬性的。

這樣是不是initialCount 和 maximumCount就很容易理解了。

同時,Semaphore 還有非常重要的方法(Release)方法,再把上面的栗子舉起來說話,Release就是歸還鑰匙,任務結束了,那麼就出門還鑰匙,然後其它在門口等待的任務就可以領到鑰匙進門了:)

再者,Semaphore 繼承自System.Threading.WaitHandle,於是乎,它就具有了一系列Wait方法,當有新任務來領鑰匙,一看,盒子空了,那怎麼辦呢,等吧,但是等多久呢,是一直等下去還是等一個超時時間,這就看業務邏輯了。

在我的SemaphoreConcurrentPolicy實現裡,會提供一個超時時間,爬蟲螞蟻小隊長會判斷,如果沒拿到鑰匙,就會再次回來嘗試取鑰匙。

OK,接下來,就是對我們的螞蟻小隊長進行改造了:

 1 namespace MikeWare.Core.Components.CrawlerFramework
 2 {
 3     using MikeWare.Core.Components.CrawlerFramework.Policies;
 4     using System;
 5     using System.Collections.Concurrent;
 6     using System.Threading;
 7     using System.Threading.Tasks;
 8 
 9     public class LeaderAnt : Ant
10     {
11         private ConcurrentQueue<JobContext> Queue;
12         private ManualResetEvent mre = new ManualResetEvent(false);
13         public AConcurrentPolicy EnqueuePolicy { get; set; }
14 
15         ……
16 
17         public void Enqueue(JobContext context)
18         {
19             if (null != EnqueuePolicy)
20             {
21                 while (!EnqueuePolicy.WaitOne(TimeSpan.FromMilliseconds(3)) && !mre.WaitOne(1))
22                     continue;
23             }
24 
25             Queue.Enqueue(context);
26         }
27 
28         ……
29 }
領隊 —— LeaderAnt

主要是在入隊的時候,增加了拿鑰匙的環節;

 1 namespace MikeWare.Crawlers.EBooks.Bizs
 2 {
 3     using MikeWare.Core.Components.CrawlerFramework;
 4     using MikeWare.Core.Components.CrawlerFramework.Policies;
 5     using MikeWare.Crawlers.EBooks.Entities;
 6     using System;
 7     using System.Collections.Generic;
 8     using System.Net;
 9 
10     public class EBooksCrawler
11     {
12         public static void Start(int pageIndex, DateTime lastUpdateTime)
13         {
14             var leader = new LeaderAnt()
15             {
16                 EnqueuePolicy = new SemaphoreConcurrentPolicy(100, 100)
17                 //EnqueuePolicy = new PeriodEnqueuePolicy(TimeSpan.FromMilliseconds(150))
18             };
19 
20             var newContext = new JobContext
21             {
22                 JobName = $"奇書網-最新電子書-列表-第{pageIndex.ToString("00000")}頁",
23                 Uri = $"http://www.xqishuta.com/s/new/index_{pageIndex}.html",
24                 Method = WebRequestMethods.Http.Get,
25                 InParams = new Dictionary<string, object>(),
26                 Analizer = new BooksListAnalizer(),
27             };
28             newContext.InParams.Add(Consts.PAGE_INDEX, 1);
29             newContext.InParams.Add(Consts.LAST_UPDATE_TIME, DateTime.MinValue);
30 
31             leader.Enqueue(newContext);
32 
33             leader.Work();
34         }
35     }
36 }
業務層 —— EBooksCrawler

主要是在構造LeaderAnt的時候,為其指定了我們要使用的策略;

同時需要注意的是,這個SemaphoreConcurrentPolicy併發策略的實現,並沒有規定入隊的時間間隔,而是控制了最大的佇列長度,所以,併發的頻率可能高,可能低,這個策略可以用來制衡資源的使用情況。關於入隊時間間隔,可以使用PeriodConcurrentPolicy或自己實現策略來控制;

另一個策略的實現,我們就不在這裡細說了。有興趣的同學可以看看原始碼。

 

好了,本節的內容就這麼多吧,相信大家對併發策略的制定與實現,都有了各自的理解。

後續章節同樣精彩,敬請期待……

 

 

喜歡本系列叢書的朋友,可以點選連結加入QQ交流群(994761602)【C# 破境之道】
方便各位在有疑問的時候可以及時給我個反饋。同時,也算是給各位志同道合的朋友提供一個交流的平臺。
需要原始碼的童鞋,也可以在群檔案中獲取最新原始碼。