1. 程式人生 > >Vert.x(五): Vert.x-通過非同步的方式使用JDBC連線SQL

Vert.x(五): Vert.x-通過非同步的方式使用JDBC連線SQL

在這篇文章中,我們將會看到怎樣在vert.x應用中使用HSQL,當然也可以使用任意JDBC,以及使用vertx-jdbc-client提供的非同步的API,這篇文章的程式碼在github上。

非同步?

vert.x一個很重要的特點就是它的非同步性。使用非同步的API,不需要等結果返回,當有結果返回時,vert.x會主動通知。為了說明這個,我們來看一個簡單的例子。

我們假設有個add方法。一般來說,會像int r = add(1, 1)

這樣來使用它。這是一個同步的API,所以你必須等到返回結果。非同步的API會是這樣:add(1, 1, r -> { /*do something with the result*/})。在這個版本中,你傳入了一個Handler,當結果計算出來時才被呼叫。這個方法不返回任何東西,實現如下:

public void add(int a, int b, Handler<Integer> resultHandler) {
    int r = a + b;
    resultHandler.handle(r);
}

為了避免混淆概念,非同步API並不是多執行緒。像我們在add例子裡看到的,並沒有涉及多執行緒。

非同步JDBC

看了一些基本的非同步的API,現在瞭解下vertx-jdbc-client。這個元件能夠讓我們通過JDBC driver與資料庫互動。這些互動都是非同步的,以前這樣:

String sql = "SELECT * FROM Products";
ResultSet rs = stmt.executeQuery(sql);

現在要這樣:

connection.query("SELECT * FROM Products", result -> {
        // do something with the result
});

這個模型更高效,當結果出來後vert.x通知,避免了等待結果。

增加maven依賴

pom.xml檔案中增加兩個 Maven dependencies

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-jdbc-client</artifactId>
  <version>3.1.0</version>
</dependency>
<dependency>
  <groupId>org.hsqldb</groupId>
  <artifactId>hsqldb</artifactId>
  <version>2.3.3</version>
</dependency>

第一個依賴提供了vertx-jdbc-client,第二個提供了HSQL JDBC的驅動。如果你想使用另外一個數據庫,修改這個依賴,同時你還需要修改JDBC urlJDBC driver名。

初始化JDBC client

建立JDBC 客戶端(client):

MyFirstVerticle類中,宣告一個新變數JDBCClient jdbc,並且在start方法中新增:

jdbc = JDBCClient.createShared(vertx, config(), "My-Whisky-Collection");

建立了一個JDBC client例項,使用verticle的配置檔案配置JDBC client。這個配置檔案需要提供下面的配置才能讓JDBC client正常工作:

  • url-JDBC url,例如:jdbc:hsqldb:mem:db?shutdown=true
  • _driver class-JDBC的驅動,例如:org.hsqldb.jdbcDriver

有了client,接下來需要連線資料庫。連線資料庫是通過使用jdbc.getConnection來實現的,jdbc.getConnection需要傳入一個Handler<AsyncResult<SQLConnection>>引數。我們深入的瞭解下這個型別。首先,這是一個Handler,因此當結果準備好時它就會被呼叫。這個結果是AsyncResult<SQLConnection>的一個例項。AsyncResultvert.x提供的一個結構,使用它能夠知道連線資料庫的操作是成功或失敗了。如果成功了,它就會提供一個結果,這裡結果是一個SQLConnection的例項。

當你接收一個AsyncResult的例項時,程式碼通常是:

if (ar.failed()) {
  System.err.println("The operation has failed...: "
      + ar.cause().getMessage());
} else {
  // Use the result:
  result = ar.result();
 }

需要獲取到SQLConnection,然後啟動rest的應用。因為變成了非同步的,這需要改變啟動應用的方式。因此,如果將啟動序列劃分成多塊:

startBackend(
 (connection) -> createSomeData(connection,
     (nothing) -> startWebApp(
         (http) -> completeStartup(http, fut)
     ), fut
 ), fut);
  • startBackend- 獲取SQLConnection物件,然後呼叫下一步
  • createSomeData- 初始化資料庫並插入資料。當完成後,呼叫下一步
  • startWebApp- 啟動web應用
  • completeStartup- 最後完成啟動

fut由vert.x傳入,通知已經啟動或者啟動過程中遇到的問題。

startBackend方法:

private void startBackend(Handler<AsyncResult<SQLConnection>> next, Future<Void> fut) {
    jdbc.getConnection(ar -> {
      if (ar.failed()) {
        fut.fail(ar.cause());
      } else {
        next.handle(Future.succeededFuture(ar.result()));
      }
    });
  }

這個方法獲取了一個SQLConnection物件,檢查操作是否完成。如果成功,會呼叫下一步。失敗了,就會報告一個錯誤。其他的方法遵循同樣的模式:

  • 檢查上一步操作是否成功
  • 處理業務邏輯
  • 呼叫下一步

SQL

客戶端已經準備好了,現在寫SQL。從createSomeData方法開始,這個方法也是啟動順序中的一部分:

private void createSomeData(AsyncResult<SQLConnection> result,
    Handler<AsyncResult<Void>> next, Future<Void> fut) {
    if (result.failed()) {
      fut.fail(result.cause());
    } else {
      SQLConnection connection = result.result();
      connection.execute(
          "CREATE TABLE IF NOT EXISTS Whisky (id INTEGER IDENTITY, name varchar(100), " +
          "origin varchar(100))",
          ar -> {
            if (ar.failed()) {
              fut.fail(ar.cause());
              connection.close();
              return;
            }
            connection.query("SELECT * FROM Whisky", select -> {
              if (select.failed()) {
                fut.fail(ar.cause());
                connection.close();
                return;
              }
              if (select.result().getNumRows() == 0) {
                insert(
                    new Whisky("Bowmore 15 Years Laimrig", "Scotland, Islay"),
                    connection,
                    (v) -> insert(new Whisky("Talisker 57° North", "Scotland, Island"),
                        connection,
                        (r) -> {
                          next.handle(Future.<Void>succeededFuture());
                          connection.close();
                        }));                                                    
              } else {
                next.handle(Future.<Void>succeededFuture());
                connection.close();
              }
            });
          });
    }
  }

這個方法檢查SQLConnection是否可用,然後執行一些SQL語句。首先,如果表不存在就建立表。看看下面程式碼:

connection.execute(
    SQL statement,
    handler called when the statement has been executed
)

handler接收AsyncResult<Void>,例如:只有是通知而已,沒有實際返回的結果。

關閉連線

操作完成後,別忘了關閉SQL連結。這個連線會被放入連線池並且可以被重複利用。

在這個handler的程式碼裡,檢查了statement是否正確的執行了,如果正確,我們接下來檢查表是否含有資料,如果沒有,將會使用insert方法插入資料:

private void insert(Whisky whisky, SQLConnection connection, Handler<AsyncResult<Whisky>> next) {
  String sql = "INSERT INTO Whisky (name, origin) VALUES ?, ?";
  connection.updateWithParams(sql,
      new JsonArray().add(whisky.getName()).add(whisky.getOrigin()),
      (ar) -> {
        if (ar.failed()) {
          next.handle(Future.failedFuture(ar.cause()));
          return;
        }
        UpdateResult result = ar.result();
        // Build a new whisky instance with the generated id.
        Whisky w = new Whisky(result.getKeys().getInteger(0), whisky.getName(), whisky.getOrigin());
        next.handle(Future.succeededFuture(w));
      });
}

這個方法使用帶有INSERT(插入)statement(宣告)的upateWithParams方法,且傳入了值。這個方法避免了SQL注入。一旦statement執行了(當資料庫沒有此條資料就會建立),就建立一個新的Whisky物件,自動生成ID。

帶有資料庫(SQL)的REST

上面的方法都是啟動順序的一部分。但是,關於呼叫REST API的方法又是怎麼樣的呢?以getAll方法為例。這個方法被web應用前端呼叫,並檢索儲存的所有的產品:

private void getAll(RoutingContext routingContext) {
    jdbc.getConnection(ar -> {
      SQLConnection connection = ar.result();
      connection.query("SELECT * FROM Whisky", result -> {
        List<Whisky> whiskies = result.result().getRows().stream().map(Whisky::new).collect(Collectors.toList());
        routingContext.response()
            .putHeader("content-type", "application/json; charset=utf-8")
            .end(Json.encodePrettily(whiskies));
        connection.close(); // Close the connection        
      });
    });
  }

這個方法獲得了一個SQLConnection物件,然後發出一個查詢。一旦獲取到查詢結果,它會像之前的方法一樣寫HTTP responsegetOnedeleteOneupdateOneaddOne方法都是一樣的。注意,在response之後,需要要關閉SQL連線。

看下傳入到query方法的handler提供的結果。獲取了一個包含了查詢結果的ResultSet。每一行都是一個JsonObject,因此,如果你有一個數據物件使用JsonObject作為唯一的引數,那麼建立這個物件很簡單。

測試

需要小小的更新下測試程式,增加配置JDBCClient。在MyFirstVerticleTest類中,將setUp方法中建立的DeploymentOption物件修改成:

DeploymentOptions options = new DeploymentOptions()
        .setConfig(new JsonObject()
            .put("http.port", port)
            .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
            .put("driver_class", "org.hsqldb.jdbcDriver")
        );

除了http.port,還配置了JDBC urlJDBC驅動。測試時,使用的是一個記憶體資料庫。在src/test/resources/my-it-config.json檔案中也要做同樣的修改。

{
  "http.port": ${http.port},
  "url": "jdbc:hsqldb:mem:it-test?shutdown=true",
  "driver_class": "org.hsqldb.jdbcDriver"
}

src/main/conf/my-application-conf.json檔案也同樣需要修改,這不是為了測試,而是為了執行這個應用:

{
  "http.port" : 8082,
  "url": "jdbc:hsqldb:file:db/whiskies",
  "driver_class": "org.hsqldb.jdbcDriver"
}

這裡這個JDBC url和上一個檔案的有點不一樣,因為需要將資料庫儲存到硬碟中。

展示時間!

開始構建程式:

mvn clean package

沒有修改API(沒有更改釋出的java檔案和REST介面),測試應該是可以順利的執行的。

啟動應用:

java -jar target/my-first-app-1.0-SNAPSHOT-fat.jar -conf src/main/conf/my-application-conf.json

訪問http://localhost:8082/assets/index.html,然後,你可以看到這個應用使用的是資料庫了。這一次,就算重啟應用,這些資料仍然在,因為儲存產品被持久化到硬盤裡了。

總結

這篇文章中,知道了怎麼在vert.x裡使用JDBC資料庫,並沒有很多複雜的東西。開始可能會被這個非同步的開發模型驚訝到,但是,一旦你開始使用了,你就很難再回去了。

下一次,我們將看到這個應用怎麼使用mongoDB來替換HSQL。

Stay tuned, and happy coding !