Zookeeper應用場景之分布式屏障Barrier
阿新 • • 發佈:2017-10-15
pri worker use int 休眠 沒有 分布 eat demo
Barrier就是柵欄或者屏障,適用於這樣的業務場景:當有些操作需要並行執行,但後續操作又需要串行執行,此時必須等待所有並行執行的線程全部結束,才開始串行,於是就需要一個屏障,來控制所有線程同時開始,並等待所有線程全部結束。
下面放上一個簡陋的圖以便理解。
要解決的問題如下:
1.如何控制所有線程同時開始? 所有的線程啟動時在zookeeper節點/barrier下插入順序臨時節點,然後檢查/barrier下所有children節點的數量是否為所有的線程數,如果不是,則等待。 如果是,則開始執行。 2.如何等待所有線程結束? 所有線程在執行完畢後,都檢查/barrier下所有children節點數量是否為0,若不為0,則繼續等待。 3.用什麽類型的節點? 根節點使用持久節點(persistent node),子節點使用臨時節點(Ephemeral node) 根節點為什麽要用持久節點?首先因為臨時節點不能有子節點,所以根節點要用持久節點,並且在程序中要判斷根節點是否存在。 子節點為什麽要用臨時節點?臨時節點隨著連接的斷開而消失,在程序中,雖然會刪除臨時節點,但可能會出現程序在節點被刪除之前就crash了,如果是持久節點,節點不會被刪除。 首先我寫了一個ZookeeperClient,由這個client端負維護一個Zookeeper對象,並對Zookeeper節點進行操作。publicclass ZookeeperClient { public static ZooKeeper zooKeeper; public static final String IP_ADDRESS="xxxx:2181"; public static void init() throws Exception{ zooKeeper = new ZooKeeper(IP_ADDRESS, 15000, new Watcher() { public void process(WatchedEvent watchedEvent) {if (watchedEvent.getState()== Event.KeeperState.SyncConnected) { } } }); } public static String createTempNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("node "+ path +" with data is created,return node "+node); return node; } catch (KeeperException e) { e.printStackTrace(); return "ERROR"; } catch (InterruptedException e) { e.printStackTrace(); return "ERROR"; } } public static boolean delete(String path,int version) { try { zooKeeper.delete(path,version); System.out.println("delete path:"+ path + "success"); return true; } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (KeeperException e) { e.printStackTrace(); return false; } } public static boolean createPersistentNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("node "+ path +" with data is created"); return true; } catch (KeeperException e) { e.printStackTrace(); return false; } catch (InterruptedException e) { e.printStackTrace(); return false; } } public static boolean checkExists(String path){ try { Stat stat = zooKeeper.exists(path,true); if(stat!=null) { return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public static int getChildrens(String path) { try { return zooKeeper.getChildren(path,true).size(); } catch (KeeperException e) { e.printStackTrace(); return -1; } catch (InterruptedException e) { e.printStackTrace(); return -1; } } }
Barrier類,負責實現屏障的功能
public class Barrier { private int size; private String rootPath; public Barrier(int size,String rootPath){ this.rootPath = rootPath; this.size = size; } public void init() throws Exception { ZookeeperClient.init(); if(!ZookeeperClient.checkExists(rootPath)){ ZookeeperClient.createPersistentNode(rootPath,"1"); } } public boolean enter(String name,String number){ ZookeeperClient.createTempNode(rootPath+"/"+name, number); //如果節點下children的數量沒有達到所有線程的總數,則繼續輪詢。 //此時要等待所有的線程都在根節點下創建了節點,才開始執行 while(true) { int size=ZookeeperClient.getChildrens(rootPath); if (size != this.size) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { return true; } } } public boolean exit(String name){ //先刪除自己的節點 ZookeeperClient.delete(rootPath+"/"+name,0); //如果節點下children數量大於0,則繼續輪詢 //此時要等待所有的線程都刪除了節點,即所有線程都做完了該做的事情,才結束線程。確保所有的線程同時結束。 while(true){ int size = ZookeeperClient.getChildrens(rootPath); if(size!=0) { System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { return true; } } } }
BarrierDemo類,負責啟動線程,為了模擬等待,我為3個線程設置了不同的休眠時間。
預想的結果是t3首先刪除節點,此時子節點剩下兩個(t1和t2),t3不會結束,而是繼續輪詢。
t2隨後刪除節點,此時子節點剩下1個(t1),t2和t3繼續輪詢。
t3最後刪除節點,此時沒有子節點,t1,t2,t3全部結束。
public class BarrierDemo { public static void main(String[] args) throws Exception{ Barrier barrier = new Barrier(3,"/barrier"); barrier.init(); Worker worker1 = new Worker(barrier,10000); Worker worker2 = new Worker(barrier,5000); Worker worker3 = new Worker(barrier,2000); Thread t1 = new Thread(worker1,"t1"); Thread t2 = new Thread(worker2,"t2"); Thread t3 = new Thread(worker3,"t3"); t1.start(); t2.start(); t3.start(); } } class Worker implements Runnable { Barrier barrier; long time; Worker(Barrier barrier, long time){ this.barrier = barrier; this.time = time; } public void run() { boolean isEnter=barrier.enter(Thread.currentThread().getName(),"0"); if(isEnter) { System.out.println(Thread.currentThread().getName()+"is working on something important now"); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } boolean isExit=barrier.exit(Thread.currentThread().getName()); if (isExit) { System.out.println(Thread.currentThread().getName()+"is exiting.."); } } }
運行結果如下
Zookeeper應用場景之分布式屏障Barrier