1. 程式人生 > >Java9 基於非同步響應式流的釋出-訂閱框架

Java9 基於非同步響應式流的釋出-訂閱框架

為響應式流(Reactive Streams)增加的釋出-訂閱(publisher-subscriber)框架、併發包CompletableFuture類的增強,等等。。

JEP266中為Java語言的併發性又引入許多新的方式:響應式流,一個為它而生互操作性更強的釋出-訂閱框架;並且為了Java9其他API而增強的 java.util.concurrent.CompletableFuture 類, 以及其他的更多的更新。

在本文中,展開對響應式流的介紹,然後介紹這個釋出訂閱框架。

響應式流(Reactive Streams)

批處理系統在收集了足夠多的資料,達到某一個閾值亟待進行下一步操作的時候,就衍生出了一個新的名詞—資料處理(Data processing)。這時候,面向流(stream-oriented)的架構思想可以幫助我們儘快達成這個目標。它可以捕獲和處理實時資料,並且可以快速地(秒級甚至更短)基於處理的結果來對系統進行相應的操作。和它相比,一個批處理系統可能會花費數秒、數天、甚至更久來做出響應。

處理資料流(特別是大小不定的實時資料)需要在非同步系統中特別小心。主要問題是要控制資源消耗,避免資料來源和處理系統出現供大於求(積壓)的情況。這時候,需要非同步地來對資料進行並行處理,利用分散式系統或者發揮多核CPU的效能,能有效地使資料處理過程變得快速高效。

響應式流(Reactive Streams)為這種非阻塞背壓的非同步流處理提供了一個標準。在處理系統出現過載的時候,採用非同步傳送訊號的方式通知資料來源做相應的處理。這個通知的訊號就像是水管的閥門一樣,關閉這個閥門會增加背壓(資料來源對處理系統的壓力),同時也會增加處理系統的壓力。

這個標準的目的是治理跨非同步邊界的流資料交換(比如向其他執行緒傳輸資料) ,同時確保處理系統不被緩衝資料而壓垮。換一種說法,背壓是這個標準模型的一個組成部分,以便允許線上程之間調停的佇列被界定。特別注意,背壓通訊是非同步的。

響應式流(Reactive Streams)的提出就致力於提供一組最小規模的介面、方法、或者協議來描述這個操作或實體:具有非阻塞背壓的非同步資料流。

釋出-訂閱(publisher-subscriber)框架

Java 9 通過java.util.concurrent.Flowjava.util.concurrent.SubmissionPublisher 類來實現響應式流。

Flow 類中定義了四個巢狀的靜態介面,用於建立流量控制的元件,釋出者在其中生成一個或多個供訂閱者使用的資料項:

  • Publisher:資料項釋出者、生產者
  • Subscriber:資料項訂閱者、消費者
  • Subscription:釋出者與訂閱者之間的關係紐帶,訂閱令牌
  • Processor:資料處理器

釋出者(Publisher)以流的方式釋出資料項,並註冊訂閱者,並且實現 Flow.Publisher 介面,該介面聲明瞭一個方法,我們通過呼叫它來為釋出者註冊訂閱者:

void subscribe(Flow.Subscriber<? super T> subscriber)

呼叫此方法來向釋出者註冊訂閱者,但是,如果此訂閱者已被其他釋出者註冊或註冊失敗(策略衝突),這個方法就會呼叫訂閱者的onError() 方法來丟擲IllegalStateException 異常,除此之外,訂閱者的onSubscribe() 方法會呼叫一個新的Flow.Subscription ,當空物件傳給訂閱者時,subscribe() 方法會丟擲NullPointerException異常。

訂閱者(Subscriber)從訂閱的釋出者中返回資料項,並且實現Flow.Subscriber<T> ,這個介面宣告的方法如下:

void onSubscribe(Flow.Subscription subscription)
void onComplete()
void onError(Throwable throwable)
void onNext(T item)

onSubscribe() 方法用來確認訂閱者註冊到釋出者是否註冊成功,它以引數列表的方式接收一個Flow.Subscription型別的引數,而這個引數型別裡面宣告的方法允許向釋出者請求釋出新的資料項,或請求釋出者不再發布更多的資料項。

onComplete() 方法用在當訂閱者沒有呼叫其他方法,而Subscription 發生錯誤沒有終止的情況下。呼叫這個方法之後,此訂閱者就不能呼叫其他方法。

onError(Throwable throwable) 方法用在當釋出者或訂閱者遭遇不可恢復的錯誤的時候, 呼叫這個方法之後,此訂閱者也不能呼叫其他方法。

onNext() 方法用於宣告下一個資料項的訂閱,如果在此過程中丟擲異常,結果將得不到確認,甚至會導致訂閱被取消。

一個訂閱令牌(Subscription)為釋出者和訂閱者定義一種關係, 使得訂閱者接收特定的資料項或者在特定時間取消接收請求,訂閱令牌實現自Flow.Subscription 介面,該介面宣告方法如下:

void request(long n)
void cancel()

request() 方法新增n個數據項到當前未滿的訂閱請求中。如果n小於或等於0,訂閱者的onError() 方法會被呼叫,並且丟擲IllegalArgumentException 異常,此外,如果n大於0,訂閱者就會在onNext() 方法的呼叫下接收到n個數據項,除非中間異常終止。 從Long.MAX_VALUE次到n次中間是無界的呼叫。

cancel() 用來終止訂閱者接收資料項,它有一種嘗試機制,也就是說,在呼叫它之後也有可能收到資料項。

最後,資料處理器(Processor)在不改變釋出者與訂閱者的情況下基於流做資料處理,可以在釋出者與訂閱者之間放多個數據處理器,成為一個處理器鏈,釋出者與訂閱者不依賴於資料處理,它們是單獨的過程。JDK9中不提供具體的資料處理器,必須由開發者來通過實現無方法宣告的Processor介面來自行構建。

SubmissionPublisher 實現自Flow.Publisher 介面,向當前訂閱者非同步提交非空的資料項,直到它被關閉。每個當前訂閱者以一個相同的順序接收新提交的資料項,除非資料項丟失或者遇到異常。SubmissionPublisher 允許資料項在丟失或阻塞的時候扮演釋出者角色。

SubmissionPublisher 提供了三個構造方法來獲取例項。無參的構造器依賴於 ForkJoinPool.commonPool() 方法來提交發布者,以此實現生產者向訂閱者提供資料項的非同步特性。

下面的程式演示了SubmissionPublisher 用法和這套釋出-訂閱框架的其他特性:

import java.util.Arrays;

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;  

public class FlowDemo
{
   public static void main(String[] args)
   {
      // Create a publisher.

      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

      // Create a subscriber and register it with the publisher.

      MySubscriber<String> subscriber = new MySubscriber<>();
      publisher.subscribe(subscriber);

      // Publish several data items and then close the publisher.

      System.out.println("Publishing data items...");
      String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
                         "jul", "aug", "sep", "oct", "nov", "dec" };
      Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
      publisher.close();

      try
      {
         synchronized("A")
         {
            "A".wait();
         }
      }
      catch (InterruptedException ie)
      {
      }
   }
}

class MySubscriber<T> implements Subscriber<T>
{
   private Subscription subscription;

   @Override
   public void onSubscribe(Subscription subscription)
   {
      this.subscription = subscription;
      subscription.request(1);
   }

   @Override
   public void onNext(T item)
   {
      System.out.println("Received: " + item);
      subscription.request(1);
   }

   @Override
   public void onError(Throwable t)
   {
      t.printStackTrace();
      synchronized("A")
      {
         "A".notifyAll();
      }
   }

   @Override
   public void onComplete()
   {
      System.out.println("Done");
      synchronized("A")
      {
         "A".notifyAll();
      }
   }
}

其中使用了wait()notifyAll() 方法來使主執行緒等到onComplete() 的完成,否則是不會看到任何輸出的。

下面是輸出結果:

Publishing data items...
Received: jan
Received: feb
Received: mar
Received: apr
Received: may
Received: jun
Received: jul
Received: aug
Received: sep
Received: oct
Received: nov
Received: dec
Done

最後說一句,熟悉RxJava的同學可以會心一笑了。

原文