Cassandra driver 入門教程 (二) 非同步呼叫連線資料庫
阿新 • • 發佈:2019-01-31
1 第一步 獲取Cluster物件
Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
2 第二步 獲得ListenableFuture的Session
ListenableFuture<Session> session = cluster.connectAsync();
3 第三步 執行非同步查詢
這裡是等session準備好以後,執行AsyncFunction生成新的非同步結果集;ListenableFuture<ResultSet> rs = Futures.transform(session, new AsyncFunction<Session, ResultSet>(){ @Override public ListenableFuture<ResultSet> apply(Session session) throws Exception { return session.executeAsync("select release_version from System.local"); } });
4 同步遍歷結果集資料
ListenableFuture<String> version = Futures.transform(rs, new Function<ResultSet,String>(){
@Override
public String apply(ResultSet rs) {
return rs.one().getString("release_version");
}
});
5 執行完成後的收尾工作(非常重要)
Futures.addCallback(version, new FutureCallback<String>(){ @Override public void onSuccess(String result) { LOG.info("cassandra release version is: "+result); //這裡很重要,記得非同步關閉 cluster.closeAsync(); } @Override public void onFailure(Throwable t) { LOG.info("error: get release version, "+t.getMessage()); cluster.closeAsync(); } });
總體程式碼如下:
public static void asyncConnect(){ Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); ListenableFuture<Session> session = cluster.connectAsync(); ListenableFuture<ResultSet> rs = Futures.transform(session, new AsyncFunction<Session, ResultSet>(){ @Override public ListenableFuture<ResultSet> apply(Session session) throws Exception { return session.executeAsync("select release_version from System.local"); } }); ListenableFuture<String> version = Futures.transform(rs, new Function<ResultSet,String>(){ @Override public String apply(ResultSet rs) { return rs.one().getString("release_version"); } }); Futures.addCallback(version, new FutureCallback<String>(){ @Override public void onSuccess(String result) { LOG.info("cassandra release version is: "+result); //這裡很重要,記得非同步關閉 cluster.closeAsync(); } @Override public void onFailure(Throwable t) { LOG.info("error: get release version, "+t.getMessage()); cluster.closeAsync(); } }); }