1. 程式人生 > >Vert.x Java開發指南——第九章 利用RxJava進行響應式程式設計

Vert.x Java開發指南——第九章 利用RxJava進行響應式程式設計

第九章 利用RxJava進行響應式程式設計

截止目前,我們已經探索了Vert.x技術棧的多個部分,使用基於回撥的API。它僅僅可以正常工作,而且這個程式設計模型對於開發者在許多語言中是非常熟悉的。儘管如此,它可能有點繁瑣,尤其當你組合幾個事件源或者處理複雜的資料流時。

而這正是RxJava閃耀的地方,Vert.x無縫的集成了它。

9.1 啟用RxJava API

除了基於回撥的API,Vert.x模組提供了一套“Rxified”API。為了啟用它,首先需要新增vertx-rx-java模組到Maven POM檔案:

<dependency> 
    <groupId>io.vertx</groupId>
    <artifactId>vertx-rx-java</artifactId>
</dependency>

Verticle現在需要修改為繼承自io.vertx.rxjava.core.AbstractVerticle而不是io.vertx.core.AbstractVerticle。這有什麼不同?前一個類擴充套件了後者,並暴露了一個io.vertx.rxjava.core.Vertx型別的屬性。

io.vertx.rxjava.core.Vertx定義了額外的rxSomething(…)方法,這相當於基於回撥的對等體。

讓我們看一下MainVerticle,以便在實踐中更好地瞭解它是如何工作的:

Single<String> dbVerticleDeployment = vertx.rxDeployVerticle("io.vertx.guides.wiki.database.WikiDatabaseVerticle"
);

rxDeploy方法沒有使用一個Handler

9.2 按順序部署Verticle

為了完成MainVerticle重構,我們必須確保部署操作被觸發並按順序發生:

dbVerticleDeployment.flatMap(id -> { ①
    Single<String> httpVerticleDeployment = vertx.rxDeployVerticle( "io.vertx.guides.wiki.http.HttpServerVerticle",new DeploymentOptions().setInstances(2));
    return
httpVerticleDeployment; }).subscribe(id->startFuture.complete(),startFuture::fail); ②

① flatMap方法應用該函式到dbVerticleDeployment的結果。此處它排程HttpServerVerticle的部署。

② 操作是在訂閱時啟動。根據結果成功還是失敗,MainVerticle呼叫startFuture的complete或者fail方法。

9.3 部分“Rxifying”的HttpServerVerticle

如果你按順序閱讀指南,並按照講解編輯你的程式碼,那麼你的HttpServerVerticle類依舊使用基於回撥的API。在你順理成章的使用RxJava API執行非同步操作之前,如併發,你需要重構HttpServerVerticle。

9.3.1 匯入Vert.x類的RxJava版本

import io.vertx.rxjava.core.AbstractVerticle; 
import io.vertx.rxjava.core.http.HttpServer; 
import io.vertx.rxjava.ext.auth.AuthProvider; 
import io.vertx.rxjava.ext.auth.User;
import io.vertx.rxjava.ext.auth.jwt.JWTAuth;
import io.vertx.rxjava.ext.auth.shiro.ShiroAuth;
import io.vertx.rxjava.ext.web.Router;
import io.vertx.rxjava.ext.web.RoutingContext;
import io.vertx.rxjava.ext.web.client.WebClient;
import io.vertx.rxjava.ext.web.client.HttpResponse; ①
import io.vertx.rxjava.ext.web.codec.BodyCodec;
import io.vertx.rxjava.ext.web.handler.*;
import io.vertx.rxjava.ext.web.sstore.LocalSessionStore; 
import io.vertx.rxjava.ext.web.templ.FreeMarkerTemplateEngine; 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;

① 我們的backupHandler()方法依舊使用HttpResponse類,因此它必須被匯入。事實證明,Vert.x提供的RxJava版本的HttpResponse可以作為這種情況下的替代。在本指南倉庫step-8目錄中的“Rxified”程式碼沒有匯入這個類,因為響應型別是由lambda表示式推斷的。

9.3.2 在一個“Rxified” vertx例項上使用委派

當你有一個io.vertx.rxjava.core.Vertx物件,並希望對io.vertx.core.Vertx例項進行方法呼叫時,可以呼叫getDelegate()方法。Verticle的start()方法需要調整,當建立一個WikiDatabaseService時:

@Override
public void start(Future<Void> startFuture) throws Exception {
    String wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue");
    dbService = io.vertx.guides.wiki.database.WikiDatabaseService.createProxy(vertx.getDelegate(), wikiDbQueue);

9.4 併發執行授權查詢

在前面的例子中,我們看到了如何使用RxJava和Rxified Vert.x API按順序執行非同步操作。但是有時候這種保證(譯者注:指按順序執行非同步操作)是不需要的,或許你只是出於效能原因需要它們簡單的併發執行。

HttpServerVerticle的JWT令牌生成過程是這種情況的一個好例子。為了建立一個令牌,我們需要所有的授權查詢結果完成,但是查詢是相互獨立的:

auth.rxAuthenticate(creds).flatMap(user -> { 
    Single<Boolean> create = user.rxIsAuthorised("create"); ① 
    Single<Boolean> delete = user.rxIsAuthorised("delete"); 
    Single<Boolean> update = user.rxIsAuthorised("update");
    return Single.zip(create, delete, update, (canCreate, canDelete, canUpdate) -> { ②
        return jwtAuth.generateToken(
            new JsonObject()
                .put("username", context.request().getHeader("login")) .put("canCreate", canCreate)
                .put("canDelete", canDelete)
                .put("canUpdate", canUpdate),
            new JWTOptions() .setSubject("Wiki API") .setIssuer("Vert.x"));
    });
}).subscribe(token -> {
    context.response().putHeader("Content-Type", "text/plain").end(token); 
}, t -> context.fail(401));

① 建立了三個Single物件,表示不同的授權查詢。

② 當三個操作成功完成時,執行zip操作的回撥方法,使用前面三個操作的結果。

9.5 使用資料庫連結

為了從池中獲取一個數據庫連結,所有你需要做的就是呼叫JDBCClient上的rxGetConnection方法:

Single<SQLConnection> connection = dbClient.rxGetConnection();

這個方法返回了一個Single,你可以輕易使用flatMap變換來執行SQL查詢:

Single<ResultSet> resultSet = dbClient.rxQueryWithParams( sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));

但是,如果SQLConnection引用不再可達,我們怎麼釋放該連結?一個簡單而且方便的方法是當Single取消訂閱時執行close:

private Single<SQLConnection> getConnection() { 
    return dbClient.rxGetConnection().flatMap(conn -> {
        Single<SQLConnection> connectionSingle = Single.just(conn); ①
        return connectionSingle.doOnUnsubscribe(conn::close); ②
    });
}

① 在獲取連結之後,我們將其封裝為一個Single。

② Single修改為當取消訂閱時,呼叫close。

現在我們可以在資料庫Verticle中任何需要執行SQL查詢的時候使用getConnection。

9.6 消除回撥和RxJava之間的差距

有時,你可能必須混合RxJava程式碼和基於回撥的API。例如,服務代理介面只能定義為回撥的方式,但是它的實現使用了Vert.x Rxified API。

這種情況下,io.vertx.rx.java.RxHelper類可以適配Handler

@Override
public WikiDatabaseService fetchAllPagesData(Handler<AsyncResult<List<JsonObject>>> resultHandler) { ① 
    dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES_DATA))
      .map(ResultSet::getRows)
      .subscribe(RxHelper.toSubscriber(resultHandler)); ②
    return this;
}

① fetchAllPagesData是一個非同步服務代理操作,其定義使用了Handler

9.7 資料流

RxJava不僅是合併不同事件來源的偉大工具,它對於資料流也非常有幫助。不像Vert.x future或者JDK future,Observable發出一個事件流,而不僅是一個單獨的事件,並且它擁有一個廣泛的資料操作運算集。

我們可以使用這些操作中一些來重構資料庫Verticle中的fetchAllPages方法:

public WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler) { 
    dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES))
        .flatMapObservable(res -> { ① 
            List<JsonArray> results = res.getResults();
            return Observable.from(results); ② 
        })
        .map(json->json.getString(0)) ③
        .sorted() ④
        .collect(JsonArray::new, JsonArray::add) ⑤
        .subscribe(RxHelper.toSubscriber(resultHandler));
    return this; 
}

① 通過flatMapObservable,我們可以使用Single發出的條目建立一個Observable。

② from將資料庫results迭代轉換成一個Observable,該Observable發出資料庫行條目。

③ 由於我們只需要頁面名稱,我們可以map每個JsonObject行到首列。

④ 客戶端希望資料按照字母表順序sorted。

⑤ 事件匯流排服務應答包含在一個單獨的JsonArray中。collect方法通過JsonArray::new建立一個新的物件,然後當條目發出時通過JsonArray::add方法新增它們。