1. 程式人生 > >分散式系統設計:批處理模式之作業佇列系統

分散式系統設計:批處理模式之作業佇列系統

之前的文章講述了關於可靠的、長時間執行的應用(long-running server applications)的設計模式,本篇介紹批處理的模式。與先前介紹的長時間執行應用所不同的是,批處理的過程預計只能執行很短的時間。例如,通過彙總使用者的資料來分析每天或每週的銷售情況,或者是轉碼視訊檔案等。批處理的特徵為用很快的速度來處理大量的資料,其中通過並行的方式來加快處理速度。分散式批處理中最著名的模式就是MapReduce,並且已經逐漸發展為一個行業,但是還有一些可以用於批處理的模式,之後的文章會逐一進行介紹。

批處理作業最簡單的形式就是作業佇列,在作業佇列系統中,有一堆的作業需要執行,每一個作業都完全獨立於其他的作業,因此可以獨立的進行處理。作業佇列系統的目標是:確保每個作業都可以在一定的時間內完成。批處理作業的Worker可以進行動態地擴容和縮容來保證作業的執行,一個通用的作業佇列如圖1所示。

圖1

通用的作業佇列系統

作業佇列可以很好的展示分散式系統模式所提供的強大功能。作業佇列中的大部分邏輯完全獨立於正在被處理的作業,並且在很多情況下,作業的交付也可以以獨立的方式執行,可以通過圖1來思考這一點。同時,它也可以通過一組共享的容器庫來提供功能,如圖2所示,大部分容器化的作業佇列可以在各種各樣的使用者之間進行共享,可重複使用的系統容器顯示為白色,而使用者提供的容器顯示為灰色。

圖2

構建一個基於容器的可重用作業佇列需要定義庫容器和使用者定義的應用程式邏輯之間的介面,在容器化的作業佇列中,有兩個介面:Source Container Interface,用來提供需要處理的作業流;另外一個是 Worker Container Interface,它知道如何來處理一個作業項。

Source Container Interface

每個作業佇列都需要一組需要處理的作業項來進行操作,作業佇列有許多不同的作業來源,具體取決於作業佇列的特定應用程式,但是一旦作業佇列獲得一組作業並開始執行,它處理作業的方式就很通用了。因此,我們可以將應用程式特定的 Queue Source 邏輯和通用佇列的處理邏輯分開。鑑於之前已經介紹的容器組織模式,這個可以看作是大使模式的一個例子。通用作業佇列容器是主要的應用程式容器,特定於應用程式的 Source Container 是代理,它將通用作業佇列的請求代理到現實世界的作業佇列中去。這個容器組如圖3所示。

圖3

有趣的是,雖然大使容器是特定於應用程式的,但是也有多種通用的作業佇列API。例如,作業來源可能是通過雲端儲存平臺的API獲得的圖片列表、儲存在網路儲存平臺上的檔案集合、或者像Kafka或Redis的釋出/訂閱系統中的佇列,在這些情況下,儘管使用者選擇適合他們場景的特定作業佇列大使,但是他們應該重用容器本身的一個通用“庫”來實現,這樣可以最大限度的減少工作量並最大限度的對程式碼進行重用。

Worker Container Interface

一旦作業佇列管理系統(Manager)獲得了特定的作業,它就需要由Worker來進行處理,這是我們通用作業佇列中的第二個容器介面。出於幾個原因,此容器和介面與前面介紹的 Work Queue Source Interface 略有不同。首先,它是一次性的API:一次呼叫就可以開始工作,並且在Worker容器的整個生命週期中不會進行其他API呼叫;其次,Worker容器不在具有作業佇列管理系統的容器組內,而是通過容器編排的API啟動的,並排程到自己所在的容器組中,這意味著作業佇列管理系統必須對Worker容器進行遠端呼叫才能開始工作,這也意味著我們需要更加小心,以防叢集中的惡意使用者向系統中注入額外的作業。

通過 Work Queue Source API,我們使用一個簡單的基於HTTP的API將作業傳送回作業佇列管理系統,這是因為我們需要重複呼叫API,因為所有的都是在localhost上執行的,所以安全性並不是問題。通過Worker容器,我們只需要進行一次呼叫,並且我們希望確保系統中的其他使用者不會意外地或惡意地向Worker新增作業。所以,對於Worker容器,我們使用基於檔案的API,也就是說,當建立Worker時,它將接收一個名為WORK_ITEM_FILE的環境變數,它將指向本地檔案系統容器中的一個檔案,其中來自作業項的資料欄位已經被寫入檔案。具體來說,如圖4所示,這個API可以通過一個 Kubernetes ConfigMap 物件來實現,該物件可以作為一個檔案被裝載到一個容器組中。

圖4

這個基於檔案的API模式對容器來說也更容易實現,通常一個作業佇列的Worker只是一個shell指令碼,在這種情況下,就沒有必要啟動 Web 伺服器來管理要執行的作業。與 Work Queue Source 的實現一樣,大多數的作業容器都是為特定作業佇列應用程式構建的專用容器映象,但也有通用的 Worker 可應用於多種不同的作業佇列應用程式。

思考一個例子,該作業佇列的Worker從雲端儲存平臺下載檔案並執行帶有該檔案的shell指令碼作為輸入,然後將其輸出複製回雲端儲存平臺。這樣的容器大部分是通用的,但是可以將特定的指令碼作為執行時引數提供給它,這樣檔案處理的大部分工作可以被多個使用者/作業佇列共享,並且只需要向最終的使用者提供檔案處理的細節。

共享作業佇列的基礎結構

鑑於前面介紹的兩個容器介面的實現,為了實現我們的可重用作業佇列,我們還需要實現什麼呢?作業佇列的基本演算法非常簡單:

  1. 通過呼叫 Source Container Interface 來載入空閒的作業;
  2. 通過查詢作業佇列的狀態來確定哪些作業已經被處理或者正在被處理當中;
  3. 對於這些作業項,生成一些使用 Worker Container Interface 進行處理的作業;
  4. 當其中一個Worker容器完成時,記錄下那些已經被完成的作業。

雖然這種演算法很容易用文字去表達,但實際上,實現起來要複雜一些,幸運的是,Kubernetes 的容器編排包含了許多特性,使其更容易實現。也就是說,Kubernetes包含了一個Job物件,這允許可靠的執行作業佇列,作業可以配置為在 Worker 容器上只執行一次或者一直執行直到它成功完成。如果Worker容器設定為執行完成,那麼即使叢集中的某臺機器發生故障,作業最終也會成功執行,這樣極大地簡化了作業佇列的構建,因為編排系統對每個作業的可靠執行負責。

此外,Kubernetes 還為每個Job物件提供註釋,使我們能夠在每個作業上標記它正在處理的作業項,瞭解哪些作業項正在被處理中,以及哪些已經成功完成或者失敗了。

整體來講,這意味著我們可以在 Kubernetes 的編排系統層之上實現一個作業佇列,而無需使用我們自己的任何儲存,這極大地簡化了構建作業佇列基礎結構的任務。

因此,我們使用如下的方式進行作業佇列容器的擴容操作:

重複的進行如下操作:

  1. 從 Work Source Container Interface 獲得作業項的列表;
  2. 獲得為服務此作業佇列而建立的所有作業的列表;
  3. 通過對列表中的作業進行區分來獲得尚未進行處理的作業項集合;
  4. 對於這些沒有處理的作業項,建立新的Job物件,以生成適當的 Worker 容器。

Worker的動態縮放

先前描述的作業佇列對於到達作業佇列中作業項的處理速度非常快,但是這會導致突發性資源負載被放置到叢集的編排系統叢集上,如果你有許多不同的工作負載在不同的時間爆發還好,因為這樣可以保證基礎框架的平均資源利用率,但是如果你沒有足夠數量的不同負載,通過過多或者過少的方式來縮放作業佇列,那麼就需要你通過 Over-Provision Resource 的方式來支援這種突發負載的發生,同時當沒有那麼多作業需要處理時,那些過度分配的資源就會閒置下來,從而造成很大的浪費(資源/金錢)。

為了解決這個問題,可以限制作業佇列建立的 Job 物件的總數,這樣做自然會限制作業處理的並行性,並因此限制你在特定時間可以使用的最大資源量。然而,這樣做會增加在高負載下作業完成的時間,如果負載是突發性的,那還好,因為你可以用冗餘(slack)的時間來處理高負載下積壓的作業,但是,如果在平時穩定的狀態下利用率就很高,那麼在高負載下作業佇列可能永遠無法趕上來,作業完成的時間將會變得越來越長。

當你的作業佇列面臨這種情況時,你需要讓它自己動態調整來增加並行性(相應的增加資源)以便跟上不斷進入佇列的作業。幸運的是,我們可以使用數學公式來確定何時需要對我們的作業佇列進行動態擴容。

思考一個這樣的作業佇列,平均每分鐘會有一個新的作業項到達,每個作業項平均需要30秒才能完成,這樣的系統能夠跟上它接收到的所有作業,即使大量的作業同時到達併產生了積壓,平均而言,作業佇列會處理2個到達的作業項,因此它可以逐步完成這些積壓的作業。

相反,如果一個作業佇列平均每分鐘會有一個新的作業項到達,每個作業平均需要1分鐘才能完成,那麼這個系統是完全平衡的,但它不能很好的處理突發負載,它可以在突發負載的情況下趕上,但是這需要一段時間,並且它沒有冗餘的時間或能力來處理作業到達速率的持續增長。這可能不是一個理想的執行方式,因為保持一個穩定的系統需要一些安全餘量,通過這些安全餘量來保持增長和其他作業量的持續增長或者作業處理過程中的意外減速。

最後,考慮一個每分鐘都會有一個作業項到達的系統,每個作業項需要2分鐘來處理。在這樣一個系統中,作業的佇列將會無限制的增長,佇列中的作業項的延遲也會無限制的增長,並且使用者也會變的十分痛苦。

因此,我們可以跟蹤我們作業佇列中的一些指標,作業項之間的平均時間將為我們提供新作業的到達時間間隔;我們還可以跟蹤作業處理的平均時間(不包括在佇列中的時間)。要擁有一個穩定的作業佇列,我們需要調整資源的數量,以便處理任何作業項的時間少於新作業項的到達時間。如果我們正在並行處理作業,我們也將作業項的處理時間除以並行度,例如,每個作業項處理需要1分鐘,但我們能並行處理四個作業項,則處理一個作業項的有效時間為15秒,因此我們可以保持16秒或者更長的到達時間間隔。

這種方法使構建一個自動調整器來對我們作業佇列的大小進行動態擴容非常簡單,雖然縮小作業佇列的規模有一些棘手,但是可以使用相同的數學計算方法以及啟發式的方法來維護資源的安全餘量。例如,可以減少並行度,直到某個作業項的處理時間為新作業項到達間隔時間的90%。

Multi-Worker Pattern

本書的主題之一是使用容器來封裝和重用程式碼,對於本文描述的作業佇列模式也是如此。除了重用容器來驅動作業佇列本身的模式之外,還可以重用多個不同的容器來進行Worker的實現。例如,假設你有三種不同型別的作業要在特定的作業佇列上執行;例如,你可能想要檢測影象中的人臉,用身份標記這些人臉,然後模糊影象中的人臉。你可以編寫一個Worker來完成這一整套的任務,但是這將是一個定製的解決方案,下次你想識別別的東西時(如汽車),它仍然不能進行重用,但仍然會產生相同的模糊效果。

為了實現這種程式碼重用,Multi-Worker模式是前面章節中描述的介面卡模式的一個特例。在這種情況下,Multi-Worker模式將不同的Worker容器的集合轉換為實現Worker介面的單個統一容器,但將實際作業分配給一組不同的可容用容器,如圖5所示。

圖5

因為這種程式碼重用,多個不同Worker容器的組合意味著程式碼重用的增加以及減少設計面向批處理的分散式系統的工作量。