1. 程式人生 > >Cassandra driver 入門教程 (二) 非同步呼叫連線資料庫

Cassandra driver 入門教程 (二) 非同步呼叫連線資料庫

1 第一步 獲取Cluster物件

Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();


2 第二步 獲得ListenableFuture的Session

ListenableFuture<Session> session = cluster.connectAsync();


3 第三步 執行非同步查詢

ListenableFuture<ResultSet> rs = Futures.transform(session, new AsyncFunction<Session, ResultSet>(){

			@Override
			public ListenableFuture<ResultSet> apply(Session session) throws Exception {
				return session.executeAsync("select release_version from System.local");
			}
			
		});
		
這裡是等session準備好以後,執行AsyncFunction生成新的非同步結果集;

4 同步遍歷結果集資料

ListenableFuture<String> version = Futures.transform(rs, new Function<ResultSet,String>(){

			@Override
			public String apply(ResultSet rs) {
				return rs.one().getString("release_version");
			}
			
		});

5 執行完成後的收尾工作(非常重要)
Futures.addCallback(version, new FutureCallback<String>(){

			@Override
			public void onSuccess(String result) {
				LOG.info("cassandra release version is: "+result);
				//這裡很重要,記得非同步關閉
				cluster.closeAsync();
			}

			@Override
			public void onFailure(Throwable t) {
				LOG.info("error: get release version, "+t.getMessage());
				cluster.closeAsync();
			}
			
		});


總體程式碼如下:

public static void asyncConnect(){
		Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
		
		ListenableFuture<Session> session = cluster.connectAsync();
		
		ListenableFuture<ResultSet> rs = Futures.transform(session, new AsyncFunction<Session, ResultSet>(){

			@Override
			public ListenableFuture<ResultSet> apply(Session session) throws Exception {
				return session.executeAsync("select release_version from System.local");
			}
			
		});
		
		ListenableFuture<String> version = Futures.transform(rs, new Function<ResultSet,String>(){

			@Override
			public String apply(ResultSet rs) {
				return rs.one().getString("release_version");
			}
			
		});
		
		Futures.addCallback(version, new FutureCallback<String>(){

			@Override
			public void onSuccess(String result) {
				LOG.info("cassandra release version is: "+result);
				//這裡很重要,記得非同步關閉
				cluster.closeAsync();
			}

			@Override
			public void onFailure(Throwable t) {
				LOG.info("error: get release version, "+t.getMessage());
				cluster.closeAsync();
			}
			
		});
	}