1. 程式人生 > >讀《分布式一致性原理》JAVA客戶端API操作

讀《分布式一致性原理》JAVA客戶端API操作

rst org RoCE row out 錯誤 throws eat 服務

創建會話

客戶端可以通過創建一個Zookeeper實例來連接服務器。4種構造方法如下

ZooKeeper(connectString, sessionTimeout, watcher);

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)


 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,long sessionId, byte[] sessionPasswd);

ZooKeeper(String connectString, 
int sessionTimeout, Watcher watcher,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

技術分享圖片

技術分享圖片

註意,zookeeper客戶端和服務端會話建立是一個異步的過程,也是就是說在程序中構造zookeeper方法初始化方法執行完後

會立即返回,在大多數情況下此時並沒有真正的建立一個可用的會話,此時會話正處於CONnectioning狀態。當真正的會話建立後

服務端會向客戶端發送一個通知,客戶端在獲取這個通知後會話才真正的建立。

package session;
import java.io.IOException;
import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; public class CreateZookeeper implements Watcher { private static CountDownLatch connectedSemaphore = new
CountDownLatch(1); @Override public void process(WatchedEvent event) { System.out.println("receive watched event:"+event); if (KeeperState.SyncConnected==event.getState()) { connectedSemaphore.countDown(); } } public static void main(String[] args) throws IOException { ZooKeeper zooKeeper = new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper()); System.out.println(zooKeeper.getState()); try { connectedSemaphore.await(); } catch (InterruptedException e) {} System.out.println("zookeeper session established"); } }

程序運行結果如下:

技術分享圖片

CreateZookeeper類實現了Watcher接口重寫了process方法,該方法負責處理來自zookeeper服務端的通知。在

收到來自服務端的synconncted事件後,解除了主程序中的CountDownLatch等待阻塞。至此客戶端會話創建完成。

創建一個復用sessionId和sessionPasswd的實例

package session;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class CreateZookeeper2 implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);

@Override
public void process(WatchedEvent event) {
    System.out.println("receive watched event:"+event);
    if (KeeperState.SyncConnected==event.getState()) {
        connectedSemaphore.countDown();
    }
    
}

public static void main(String[] args) throws IOException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper());
    connectedSemaphore.await();
    long sessionId = zooKeeper.getSessionId();
    byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    
    //復用sessionId和sessionPasswd
     zooKeeper = new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper(),1l,"test".getBytes());
     
     zooKeeper = new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper(),sessionId,sessionPasswd);
     
     Thread.sleep(Integer.MAX_VALUE);

}

}

技術分享圖片

我們可以看出,第一次使用錯誤的id和pw服務端返回“EXpired”事件,而第二次使用正確的id和pw則重新連接上。

讀《分布式一致性原理》JAVA客戶端API操作