1. 程式人生 > >Zookeeper應用場景之分布式屏障Barrier

Zookeeper應用場景之分布式屏障Barrier

pri worker use int 休眠 沒有 分布 eat demo

Barrier就是柵欄或者屏障,適用於這樣的業務場景:當有些操作需要並行執行,但後續操作又需要串行執行,此時必須等待所有並行執行的線程全部結束,才開始串行,於是就需要一個屏障,來控制所有線程同時開始,並等待所有線程全部結束。

下面放上一個簡陋的圖以便理解。

技術分享

要解決的問題如下:

1.如何控制所有線程同時開始? 所有的線程啟動時在zookeeper節點/barrier下插入順序臨時節點,然後檢查/barrier下所有children節點的數量是否為所有的線程數,如果不是,則等待。 如果是,則開始執行。 2.如何等待所有線程結束? 所有線程在執行完畢後,都檢查/barrier下所有children節點數量是否為0,若不為0,則繼續等待。 3.用什麽類型的節點? 根節點使用持久節點(persistent node),子節點使用臨時節點(Ephemeral node) 根節點為什麽要用持久節點?首先因為臨時節點不能有子節點,所以根節點要用持久節點,並且在程序中要判斷根節點是否存在。 子節點為什麽要用臨時節點?臨時節點隨著連接的斷開而消失,在程序中,雖然會刪除臨時節點,但可能會出現程序在節點被刪除之前就crash了,如果是持久節點,節點不會被刪除。 首先我寫了一個ZookeeperClient,由這個client端負維護一個Zookeeper對象,並對Zookeeper節點進行操作。
public
class ZookeeperClient { public static ZooKeeper zooKeeper; public static final String IP_ADDRESS="xxxx:2181"; public static void init() throws Exception{ zooKeeper = new ZooKeeper(IP_ADDRESS, 15000, new Watcher() { public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState()== Event.KeeperState.SyncConnected) { } } }); } public static String createTempNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(
"node "+ path +" with data is created,return node "+node); return node; } catch (KeeperException e) { e.printStackTrace(); return "ERROR"; } catch (InterruptedException e) { e.printStackTrace(); return "ERROR"; } } public static boolean delete(String path,int version) { try { zooKeeper.delete(path,version); System.out.println("delete path:"+ path + "success"); return true; } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (KeeperException e) { e.printStackTrace(); return false; } } public static boolean createPersistentNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("node "+ path +" with data is created"); return true; } catch (KeeperException e) { e.printStackTrace(); return false; } catch (InterruptedException e) { e.printStackTrace(); return false; } } public static boolean checkExists(String path){ try { Stat stat = zooKeeper.exists(path,true); if(stat!=null) { return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public static int getChildrens(String path) { try { return zooKeeper.getChildren(path,true).size(); } catch (KeeperException e) { e.printStackTrace(); return -1; } catch (InterruptedException e) { e.printStackTrace(); return -1; } } }

Barrier類,負責實現屏障的功能

public class Barrier {
    private int size;
    private String rootPath;
    public Barrier(int size,String rootPath){
        this.rootPath = rootPath;
        this.size = size;


    }
    public void init() throws Exception {
       ZookeeperClient.init();
        if(!ZookeeperClient.checkExists(rootPath)){
            ZookeeperClient.createPersistentNode(rootPath,"1");
        }
    }
    public boolean enter(String name,String number){
        ZookeeperClient.createTempNode(rootPath+"/"+name, number);
        //如果節點下children的數量沒有達到所有線程的總數,則繼續輪詢。
        //此時要等待所有的線程都在根節點下創建了節點,才開始執行
        while(true) {
            int size=ZookeeperClient.getChildrens(rootPath);
            if (size != this.size) {
                try {

                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                return true;
            }
        }
    }
    public  boolean exit(String name){
        //先刪除自己的節點
        ZookeeperClient.delete(rootPath+"/"+name,0);
        //如果節點下children數量大於0,則繼續輪詢
        //此時要等待所有的線程都刪除了節點,即所有線程都做完了該做的事情,才結束線程。確保所有的線程同時結束。
        while(true){
            int size = ZookeeperClient.getChildrens(rootPath);
            if(size!=0) {
                System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                return true;
            }
        }
    }
}

BarrierDemo類,負責啟動線程,為了模擬等待,我為3個線程設置了不同的休眠時間。

預想的結果是t3首先刪除節點,此時子節點剩下兩個(t1和t2),t3不會結束,而是繼續輪詢。

t2隨後刪除節點,此時子節點剩下1個(t1),t2和t3繼續輪詢。

t3最後刪除節點,此時沒有子節點,t1,t2,t3全部結束。

public class BarrierDemo {
    public static void main(String[] args) throws  Exception{
        Barrier barrier = new Barrier(3,"/barrier");
        barrier.init();
        Worker worker1 = new Worker(barrier,10000);
        Worker worker2 = new Worker(barrier,5000);
        Worker worker3 = new Worker(barrier,2000);
        Thread t1 = new Thread(worker1,"t1");
        Thread t2 = new Thread(worker2,"t2");
        Thread t3 = new Thread(worker3,"t3");
        t1.start();
        t2.start();
        t3.start();


    }
}
class Worker implements Runnable {
    Barrier barrier;
    long time;
    Worker(Barrier barrier, long time){
        this.barrier = barrier;
        this.time = time;

    }
    public void run() {
        boolean isEnter=barrier.enter(Thread.currentThread().getName(),"0");
        if(isEnter) {
            System.out.println(Thread.currentThread().getName()+"is working on something important now");
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        boolean isExit=barrier.exit(Thread.currentThread().getName());
        if (isExit) {
            System.out.println(Thread.currentThread().getName()+"is exiting..");
        }
    }
}

運行結果如下

技術分享

技術分享

Zookeeper應用場景之分布式屏障Barrier