1. 程式人生 > >雲端計算設計模式(十五)——管道和過濾器模式

雲端計算設計模式(十五)——管道和過濾器模式

雲端計算設計模式(十五)——管道和過濾器模式


分解,執行複雜處理成一系列可重複使用分立元件的一個任務這種模式可以允許執行的處理進行部署和獨立縮放任務元素提高效能,可擴充套件性和可重用性

背景和問題


一個應用程式可能需要執行各種關於它處理資訊不同複雜任務。一個簡單,但不靈活的方式來實施這個應用程式可以執行此處理單一模組。然而,這種方法有可能減少用於重構程式碼對其進行優化或者重新使用它,如果在應用程式中其他地方所需要的相同的處理部件機會。

圖1通過使用單片式的方式示出了與處理資料問題一個應用程式接收並處理來自兩個來源的資料進行處理從每個源資料是由執行一系列任務來轉換該資料

,並傳遞結果給應用程式的業務邏輯之前的獨立模組進行處理。

圖1  - 使用單一模組實現的解決方案


部分單片模組執行的任務在功能上是非常相似的,但模組已被分開設計的實現該任務的程式碼被緊密模組內耦合並且此程式碼已開發具有很少或沒有給定重新使用或可伸縮性的思想

然而由每個模組或每個任務的部署要求執行的處理任務,可能會改變,因為業務需求進行修改。有些任務可能是計算密集型的,並可能受益於強大的硬體上執行其他人可能並不需要如此昂貴的資源。此外,額外的處理可能需要在將來順序,其中由所述處理執行的任務可能會改變一個解決方案是必需的,解決了這些問題,並且增加的可能性程式碼重用。

解決方案


分解

需要為每個資料流轉換為一組離散的元件或過濾器)的處理,其中每一個執行單任務通過標準化每個元件接收發射的資料的格式這些過濾器可以組合在一起成為一個管道這有助於避免重複程式碼並且可以很容易地移除,替換或整合額外的元件,如果處理要求改變圖2顯示了這種結構的一個例子。

圖2 - 通過使用管道和過濾器實現的解決方案


處理一個請求所花費的時間取決於最慢的過濾器管道中的速度。這可能是一個或多個濾波器可能被證明是一個瓶頸,尤其是如果出現一個特定的資料源的資料流的大量請求。流水線結構一個關鍵優點是它提供了機會,執行速度慢的過濾器並聯情況下,使系統能夠分散負載提高吞吐量。

可以獨立縮放組成一個

管道可以在不同的機器上執行過濾器,使他們可以利用彈性,許多雲端計算環境提供優勢。過濾器計算密集型可以在高效能的硬體上執行而其他要求不高的過濾器可以對商品便宜)的硬體來承載過濾器甚至不需要是在同一資料中心或地理位置,它允許一個管道中的每個元素的環境下接近它需要的資源來執行

圖3示出了從源1施加到管道中的資料的一個例子。

圖3  - 一個管道負載平衡元件


如果一個濾波器的輸入輸出被構造一個,它可能是能夠進行的處理並行的每個過濾器在流水線的第一個過濾器可以開始工作,並開始發射結果它們會直接傳遞到序列中的下一個過濾器之前第一過濾器已經完成它的工作。

另一個好處靈活性,這種模式可以提供如果一個過濾器發生故障或者其上執行機器不再可用時,管道可能能夠重新安排濾波器所執行的工作,並指示工作到元件的另一個例項。單個過濾器故障不會必然導致整個管道故障。

使用管道和過濾器補償交易模式相結合的模式可以提供一種替代的方法來實現分散式事務分散式事務可以被分解成單獨的的任務,每個都可以通過使用一個過濾器,也實現了補償事務圖案來實現。一個管道中的過濾器可以在執行接近它們保持資料被實現為單獨的託管工作。

問題和注意事項


在決定如何實現這個模式時,您應考慮以下幾點:
複雜性增加的靈活性,這種模式提供了還可以引入複雜性,特別是如果分佈在不同的伺服器上在管道的過濾器。
•可靠性使用一個基礎結構,可以確保在管道中的過濾器之間流動的資料也不會丟失
冪等性如果在管道中的過濾失敗接收到訊息,任務被重新排程到過濾器的另一個例項,所述部分工作可能已經完成。如果這個工作更新的全域性狀態的某些方面儲存在資料庫中的資訊,同樣更新可以重複如果公佈結果,在管道中的下一個過濾器後,過濾器出現故障,但在此之前表示,該公司已經成功地完成了它的工作可能會出現類似的問題。在這些情況下,相同的工作可以由過濾器的另一個例項被重複,導致相同的結果要貼兩次。這可能導致在管道隨後過濾兩次處理相同的資料因此,一個管道的過濾器應該被設計為冪等欲瞭解更多資訊,請參見喬納森·奧利弗的部落格冪等模式
重複的訊息如果管道中的過濾器可以釋出一個訊息給流水線的下一個階段之後發生故障時,過濾器的另一個例項,可以執行冪等考慮以上所描述的,並且將釋出相同訊息的拷貝到流水線可能導致同樣的資訊的兩個例項被傳遞到下一個過濾器為了避免這種情況管道應檢測並消除重複的訊息。

注意:

如果要實現管道使用訊息佇列微軟的Azure服務匯流排佇列訊息佇列基礎設施可以提供自動重複訊息檢測和清除


•上下文狀態。在管道中每個過濾器主要執行在孤立和不應該這件事是如何被呼叫的任何假設。意味著,每一個過濾器必須具有足夠的上下文與它能夠執行它的工作提供這種情況下可包含相當數量的狀態資訊

何時使用這個模式


使用這種模式
由一個應用程式所需的處理可以很容易地被分解成一組離散的獨立的步驟。
由應用程式執行的處理步驟具有不同的可擴充套件性要求

注意:

它可能會過濾器擴充套件一起相同的過程。欲瞭解更多資訊,請參閱計算資源整合模式


•靈活性是必需的,以允許通過一個應用程式能力進行新增和刪除步驟中的處理步驟重新排序。
該系統可以受益於分配處理不同伺服器的步驟。
•一個可靠的解決方案是必需的,當資料正在被處理的最小化一個步驟失敗的影響。

這種模式可能不適合
通過應用程式執行的處理步驟並不是獨立的或者他們必須共同作為同一事務的一部分來執行。
•在一個步驟所需上下文或狀態的資訊量使得這種方法效率很低。它可能會持續狀態資訊到資料庫代替,但不要使用此策略,如果在資料庫上的額外負載會導致過度競爭

例子


可以使用訊息佇列一個序列,以提供執行流水線所需的基礎設施最初的訊息佇列接收未處理的訊息實現為過濾器的任務偵聽佇列的訊息的元件,它執行其工作然後投遞轉化的訊息序列中的下一個佇列另一個過濾器的任務可以偵聽在這個佇列中的訊息對其進行處理後的結果到另一個佇列,依此類推,直到完全轉化的資料出現在佇列中的最後一個訊息。

4 - 通過使用訊息佇列實現管道


如果你正在構建一個解決方案,在Azure上,你可以使用服務匯流排佇列提供了可靠的,可擴充套件的排隊機制。下面所示的ServiceBusPipeFilter提供了一個例子它演示了如何實現接收從佇列中輸入訊息處理這些郵件的過濾器張貼結果到另一個佇列

注意:

ServiceBusPipeFilterPipesAndFilters解決方案PipesAndFilters.Shared專案定義此示例程式碼都可以可以下載本指導意見

public class ServiceBusPipeFilter
{
  ...
  private readonly string inQueuePath;
  private readonly string outQueuePath;
  ...
  private QueueClient inQueue;
  private QueueClient outQueue;
  ...

  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
  {
     ...
     this.inQueuePath = inQueuePath;
     this.outQueuePath = outQueuePath;
  }

  public void Start()
  {
    ...
    // Create the outbound filter queue if it does not exist.
    ...
    this.outQueue = QueueClient.CreateFromConnectionString(...);
    
    ...
    // Create the inbound and outbound queue clients.
    this.inQueue = QueueClient.CreateFromConnectionString(...);
  }

  public void OnPipeFilterMessageAsync(
    Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...) 
  {
    ...

    this.inQueue.OnMessageAsync(
      async (msg) =>
    {
      ...
      // Process the filter and send the output to the 
      // next queue in the pipeline.
      var outMessage = await asyncFilterTask(msg);

      // Send the message from the filter processor 
      // to the next queue in the pipeline.
      if (outQueue != null)
      {
        await outQueue.SendAsync(outMessage);
      }

      // Note: There is a chance that the same message could be sent twice 
      // or that a message may be processed by an upstream or downstream 
      // filter at the same time.
      // This would happen in a situation where processing of a message was
      // completed, it was sent to the next pipe/queue, and then failed 
      // to complete when using the PeekLock method.
      // Idempotent message processing and concurrency should be considered 
      // in a real-world implementation.
    },
    options);
  }

  public async Task Close(TimeSpan timespan)
  {
    // Pause the processing threads.
    this.pauseProcessingEvent.Reset();

    // There is no clean approach for waiting for the threads to complete
    // the processing. This example simply stops any new processing, waits
    // for the existing thread to complete, then closes the message pump 
    // and finally returns.
    Thread.Sleep(timespan);

    this.inQueue.Close();
    ...
  }

  ...
}


ServiceBusPipeFilterStart方法連線到一對輸入和輸出佇列,以及關閉方法從輸入佇列斷開OnPipeFilterMessageAsync方法執行訊息的實際處理;asyncFilterTask引數這種方法指定要執行的處理。OnPipeFilterMessageAsync方法等待輸入佇列中收到的訊息,因為它到達,張貼結果到輸出佇列通過執行在每個郵件的asyncFilterTask引數指定的程式碼。佇列本身建構函式中指定

樣品溶液的過濾器實現了一組工作角色每個工人的作用可獨立進行調整,這取決於它執行業務處理的複雜性,或者它需要執行此處理的資源。此外,輔助角色的多個例項可以並行地執行,以提高吞吐量。

下面的程式碼顯示了一個名為PipeFilterARoleEntry的Azure工作者角色,這是樣品溶液中PipeFilterA專案定義

public class PipeFilterARoleEntry : RoleEntryPoint
{
  ...
  private ServiceBusPipeFilter pipeFilterA;

  public override bool OnStart()
  {
    ...
    this.pipeFilterA = new ServiceBusPipeFilter(
      ...,
      Constants.QueueAPath,
      Constants.QueueBPath);

    this.pipeFilterA.Start();
    ...
  }

  public override void Run()
  {
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
    {
      // Clone the message and update it.
      // Properties set by the broker (Deliver count, enqueue time, ...) 
      // are not cloned and must be copied over if required.
      var newMsg = msg.Clone();
      
      await Task.Delay(500); // DOING WORK

      Trace.TraceInformation("Filter A processed message:{0} at {1}", 
        msg.MessageId, DateTime.UtcNow);

      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");

      return newMsg;
    });

    ...
  }

  ...
}


這個角色包含ServiceBusPipeFilter物件。角色OnStart方法連線到佇列接收輸入的資訊並張貼輸出訊息佇列名稱在常量類中定義 Run方法呼叫OnPipeFilterMessagesAsync方法接收到的本例中,該處理通過等待較短的時間模擬的)的每個訊息執行某些處理何時處理完成時,一個新的訊息被構造包含結果(在這種情況下,輸入訊息被簡單地增加了一個自定義屬性),並將該訊息傳送到輸出佇列

示例程式碼中包含一個名為PipeFilterBRoleEntryPipeFilterB專案的另一名工人的作用。這個角色類似於PipeFilterARoleEntry不同之處在於Run方法進行不同的處理在本例中的解決方案,這兩種作用結合起來,構建一個管道;PipeFilterARoleEntry角色輸出佇列用於PipeFilterBRoleEntry角色輸入佇列。

樣品溶液還提供了兩個名為InitialSenderRoleEntryInitialSender專案FinalReceiverRoleEntryFinalReceiver專案),進一步的角色InitialSenderRoleEntry作用提供了在管道中的初始訊息 OnStart方法連線到單個佇列執行方法的帖子的方法此佇列這個佇列所使用的PipeFilterARoleEntry作用,所以釋出一條訊息這個佇列輸入佇列導致PipeFilterARoleEntry作用來接收處理訊息經處理的資訊,然後通過PipeFilterBRoleEntry作用傳遞

FinalReceiveRoleEntry角色輸入佇列用於PipeFilterBRoleEntry角色的輸出佇列 Run方法FinalReceiveRoleEntry作用,如下圖所示,接收到該訊息,並且執行一些最後的處理然後將其寫入過濾器管道跟蹤輸出新增自定義屬性值。

public class FinalReceiverRoleEntry : RoleEntryPoint
{
  ...
  // Final queue/pipe in the pipeline from which to process data.
  private ServiceBusPipeFilter queueFinal;

  public override bool OnStart()
  {
    ...
    // Set up the queue.
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
    this.queueFinal.Start();
    ...
  }

  public override void Run()
  {
    this.queueFinal.OnPipeFilterMessageAsync(
      async (msg) =>
      {
        await Task.Delay(500); // DOING WORK

        // The pipeline message was received.
        Trace.TraceInformation(
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
          msg.Properties[Constants.FilterAMessageKey],
          msg.Properties[Constants.FilterBMessageKey]);

        return null;
      });
    ...
  }

  ...
}
本文翻譯自MSDN:http://msdn.microsoft.com/en-us/library/dn568100.aspx