雲端計算設計模式(十五)——管道和過濾器模式
雲端計算設計模式(十五)——管道和過濾器模式
分解,執行複雜處理成一系列可重複使用分立元件的一個任務。這種模式可以允許執行的處理進行部署和獨立縮放任務元素提高效能,可擴充套件性和可重用性。
背景和問題
一個應用程式可能需要執行各種關於它處理的資訊不同複雜的任務。一個簡單,但不靈活的方式來實施這個應用程式可以執行此處理為單一模組。然而,這種方法有可能減少用於重構程式碼,對其進行優化,或者重新使用它,如果是在應用程式中其他地方所需要的相同的處理的部件的機會。
圖1通過使用單片式的方式示出了與處理資料的問題。一個應用程式接收並處理來自兩個來源的資料進行處理。從每個源資料是由執行一系列任務來轉換該資料
圖1 - 使用單一模組實現的解決方案
部分的單片模組執行的任務在功能上是非常相似的,但在模組已被分開設計的。實現該任務的程式碼被緊密模組內耦合,並且此程式碼已開發具有很少或沒有給定重新使用或可伸縮性的思想。
然而,由每個模組或每個任務的部署要求執行的處理任務,可能會改變,因為業務需求進行修改。有些任務可能是計算密集型的,並可能受益於強大的硬體上執行,而其他人可能並不需要如此昂貴的資源。此外,額外的處理可能需要在將來,或順序,其中由所述處理執行的任務可能會改變。一個解決方案是必需的,解決了這些問題,並且增加的可能性程式碼重用。
解決方案
分解
圖2 - 通過使用管道和過濾器實現的解決方案
處理一個請求所花費的時間取決於最慢的過濾器管道中的速度。這可能是一個或多個濾波器可能被證明是一個瓶頸,尤其是如果出現在從一個特定的資料源的資料流的大量請求。流水線結構的一個關鍵優點是它提供了機會,執行速度慢的過濾器的並聯情況下,使系統能夠分散負載並提高吞吐量。
可以獨立縮放組成一個
圖3示出了從源1施加到管道中的資料的一個例子。
圖3 - 在一個管道負載平衡元件
如果一個濾波器的輸入和輸出被構造為一個流,它可能是能夠進行的處理並行的每個過濾器。在流水線的第一個過濾器可以開始工作,並開始發射其結果,它們會直接傳遞到序列中的下一個過濾器之前的第一過濾器已經完成它的工作。
另一個好處是靈活性,這種模式可以提供。如果一個過濾器發生故障或者其上執行的機器不再可用時,管道可能能夠重新安排濾波器所執行的工作,並指示此工作到元件的另一個例項。單個過濾器的故障不會必然導致整個管道的故障。
使用管道和過濾器與補償交易模式相結合的模式可以提供一種替代的方法來實現分散式事務。分散式事務可以被分解成單獨的賠的任務,每個都可以通過使用一個過濾器,也實現了補償事務圖案來實現。在一個管道中的過濾器可以在執行接近它們保持資料被實現為單獨的託管工作。
問題和注意事項
在決定如何實現這個模式時,您應考慮以下幾點:
•複雜性。增加的靈活性,這種模式提供了還可以引入複雜性,特別是如果被分佈在不同的伺服器上在管道的過濾器。
•可靠性。使用一個基礎結構,可以確保在管道中的過濾器之間流動的資料也不會丟失。
•冪等性。如果在管道中的過濾失敗接收到訊息後,任務被重新排程到過濾器的另一個例項,所述部分工作可能已經完成。如果這個工作更新的全域性狀態的某些方面(如儲存在資料庫中的資訊),同樣更新可以重複。如果公佈的結果,在管道中的下一個過濾器後,過濾器出現故障,但在此之前表示,該公司已經成功地完成了它的工作可能會出現類似的問題。在這些情況下,相同的工作可以由過濾器的另一個例項被重複,導致相同的結果要貼兩次。這可能導致在管道隨後過濾兩次處理相同的資料。因此,在一個管道的過濾器應該被設計為冪等。欲瞭解更多資訊,請參見喬納森·奧利弗的部落格冪等模式。
•重複的訊息。如果在管道中的過濾器可以釋出一個訊息給流水線的下一個階段之後發生故障時,過濾器的另一個例項,可以執行(由冪等考慮以上所描述的),並且將釋出相同訊息的拷貝到流水線。這可能導致同樣的資訊的兩個例項被傳遞到下一個過濾器。為了避免這種情況,該管道應檢測並消除重複的訊息。
注意:
如果要實現管道使用訊息佇列(如微軟的Azure服務匯流排佇列),訊息佇列基礎設施可以提供自動重複訊息檢測和清除。
•上下文和狀態。在管道中,每個過濾器主要執行在孤立和不應該做這件事是如何被呼叫的任何假設。這意味著,每一個過濾器必須具有足夠的上下文與它能夠執行它的工作提供。這種情況下可包含相當數量的狀態資訊。
何時使用這個模式
使用這種模式時:
•由一個應用程式所需的處理可以很容易地被分解成一組離散的,獨立的步驟。
•由應用程式執行的處理步驟具有不同的可擴充套件性要求。
注意:
它可能會向組過濾器應擴充套件一起在相同的過程。欲瞭解更多資訊,請參閱計算資源整合模式。
•靈活性是必需的,以允許通過一個應用程式,或能力進行新增和刪除步驟中的處理步驟重新排序。
•該系統可以受益於分配處理跨不同伺服器的步驟。
•一個可靠的解決方案是必需的,當資料正在被處理的最小化在一個步驟失敗的影響。
這種模式可能不適合時:
•通過應用程式執行的處理步驟並不是獨立的,或者他們必須共同作為同一事務的一部分來執行。
•在一個步驟所需的上下文或狀態的資訊量使得這種方法效率很低。它可能會持續狀態資訊到資料庫代替,但不要使用此策略,如果在資料庫上的額外負載會導致過度競爭。
例子
可以使用訊息佇列的一個序列,以提供執行流水線所需的基礎設施。最初的訊息佇列接收未處理的訊息。實現為過濾器的任務偵聽此佇列的訊息的元件,它執行其工作,然後投遞轉化的訊息序列中的下一個佇列。另一個過濾器的任務可以偵聽在這個佇列中的訊息,對其進行處理,後的結果到另一個佇列,依此類推,直到完全轉化的資料出現在佇列中的最後一個訊息。
圖4 - 通過使用訊息佇列實現管道
如果你正在構建一個解決方案,在Azure上,你可以使用服務匯流排佇列提供了可靠的,可擴充套件的排隊機制。下面所示的ServiceBusPipeFilter類提供了一個例子。它演示了如何實現接收從佇列中輸入訊息,處理這些郵件的過濾器,並張貼結果到另一個佇列。
注意:
該ServiceBusPipeFilter類在PipesAndFilters解決方案PipesAndFilters.Shared專案定義。此示例程式碼都可以可以下載本指導意見。
public class ServiceBusPipeFilter
{
...
private readonly string inQueuePath;
private readonly string outQueuePath;
...
private QueueClient inQueue;
private QueueClient outQueue;
...
public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
{
...
this.inQueuePath = inQueuePath;
this.outQueuePath = outQueuePath;
}
public void Start()
{
...
// Create the outbound filter queue if it does not exist.
...
this.outQueue = QueueClient.CreateFromConnectionString(...);
...
// Create the inbound and outbound queue clients.
this.inQueue = QueueClient.CreateFromConnectionString(...);
}
public void OnPipeFilterMessageAsync(
Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
{
...
this.inQueue.OnMessageAsync(
async (msg) =>
{
...
// Process the filter and send the output to the
// next queue in the pipeline.
var outMessage = await asyncFilterTask(msg);
// Send the message from the filter processor
// to the next queue in the pipeline.
if (outQueue != null)
{
await outQueue.SendAsync(outMessage);
}
// Note: There is a chance that the same message could be sent twice
// or that a message may be processed by an upstream or downstream
// filter at the same time.
// This would happen in a situation where processing of a message was
// completed, it was sent to the next pipe/queue, and then failed
// to complete when using the PeekLock method.
// Idempotent message processing and concurrency should be considered
// in a real-world implementation.
},
options);
}
public async Task Close(TimeSpan timespan)
{
// Pause the processing threads.
this.pauseProcessingEvent.Reset();
// There is no clean approach for waiting for the threads to complete
// the processing. This example simply stops any new processing, waits
// for the existing thread to complete, then closes the message pump
// and finally returns.
Thread.Sleep(timespan);
this.inQueue.Close();
...
}
...
}
在ServiceBusPipeFilter類Start方法連線到一對輸入和輸出佇列,以及關閉方法從輸入佇列斷開。該OnPipeFilterMessageAsync方法執行訊息的實際處理;該asyncFilterTask引數這種方法指定要執行的處理。該OnPipeFilterMessageAsync方法等待輸入佇列中收到的訊息,因為它到達,並張貼結果到輸出佇列通過執行在每個郵件的asyncFilterTask引數指定的程式碼。佇列本身的建構函式中指定。
樣品溶液的過濾器實現了在一組工作角色。每個工人的作用可獨立進行調整,這取決於它執行的業務處理的複雜性,或者它需要執行此處理的資源。此外,各輔助角色的多個例項可以並行地執行,以提高吞吐量。
下面的程式碼顯示了一個名為PipeFilterARoleEntry的Azure工作者角色,這是在樣品溶液中PipeFilterA專案定義。
public class PipeFilterARoleEntry : RoleEntryPoint
{
...
private ServiceBusPipeFilter pipeFilterA;
public override bool OnStart()
{
...
this.pipeFilterA = new ServiceBusPipeFilter(
...,
Constants.QueueAPath,
Constants.QueueBPath);
this.pipeFilterA.Start();
...
}
public override void Run()
{
this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
{
// Clone the message and update it.
// Properties set by the broker (Deliver count, enqueue time, ...)
// are not cloned and must be copied over if required.
var newMsg = msg.Clone();
await Task.Delay(500); // DOING WORK
Trace.TraceInformation("Filter A processed message:{0} at {1}",
msg.MessageId, DateTime.UtcNow);
newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");
return newMsg;
});
...
}
...
}
這個角色包含ServiceBusPipeFilter物件。在角色OnStart方法連線到佇列接收輸入的資訊並張貼輸出訊息(佇列的名稱在常量類中定義)。
Run方法呼叫OnPipeFilterMessagesAsync方法來對接收到的(在本例中,該處理通過等待較短的時間段模擬的)的每個訊息執行某些處理。何時處理完成時,一個新的訊息被構造包含結果(在這種情況下,輸入訊息被簡單地增加了一個自定義屬性),並將該訊息傳送到輸出佇列。
示例程式碼中包含一個名為PipeFilterBRoleEntry在PipeFilterB專案的另一名工人的作用。這個角色類似於PipeFilterARoleEntry不同之處在於它的Run方法進行不同的處理。在本例中的解決方案,這兩種作用結合起來,構建一個管道;為PipeFilterARoleEntry角色輸出佇列是用於PipeFilterBRoleEntry角色的輸入佇列。
樣品溶液還提供了兩個名為InitialSenderRoleEntry(在InitialSender專案)和FinalReceiverRoleEntry(在FinalReceiver專案),進一步的角色。該InitialSenderRoleEntry作用提供了在管道中的初始訊息。
OnStart方法連線到單個佇列和執行方法的帖子的方法來此佇列。這個佇列是所使用的PipeFilterARoleEntry作用,所以釋出一條訊息到這個佇列的輸入佇列導致由PipeFilterARoleEntry作用來接收和處理訊息。經處理的資訊,然後通過PipeFilterBRoleEntry作用傳遞。
為FinalReceiveRoleEntry角色輸入佇列是用於PipeFilterBRoleEntry角色的輸出佇列。
Run方法在FinalReceiveRoleEntry作用,如下圖所示,接收到該訊息,並且執行一些最後的處理。然後將其寫入了過濾器的管道跟蹤輸出新增自定義屬性的值。
public class FinalReceiverRoleEntry : RoleEntryPoint
{
...
// Final queue/pipe in the pipeline from which to process data.
private ServiceBusPipeFilter queueFinal;
public override bool OnStart()
{
...
// Set up the queue.
this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
this.queueFinal.Start();
...
}
public override void Run()
{
this.queueFinal.OnPipeFilterMessageAsync(
async (msg) =>
{
await Task.Delay(500); // DOING WORK
// The pipeline message was received.
Trace.TraceInformation(
"Pipeline Message Complete - FilterA:{0} FilterB:{1}",
msg.Properties[Constants.FilterAMessageKey],
msg.Properties[Constants.FilterBMessageKey]);
return null;
});
...
}
...
}
本文翻譯自MSDN:http://msdn.microsoft.com/en-us/library/dn568100.aspx