1. 程式人生 > >zookeeper入門之curator框架--幾種鎖的操作

zookeeper入門之curator框架--幾種鎖的操作

package com.git.zookeeper.passwordmanager.lock;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
import org.apache.commons.lang.math.RandomUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
 
/**
 * zookeeper 鎖功能的演示 基於curator
 * 
 * @author songqinghu
 *
 *共享鎖: 全域性同步分散式鎖, 同一時間兩臺機器只有一臺能獲得同一把鎖. 
 *共享讀寫鎖: 用於分散式的讀寫互斥處理, 同時生成兩個鎖:一個讀鎖, 一個寫鎖,
 *        讀鎖能被多個應用持有, 而寫鎖只能一個獨佔, 當寫鎖未被持有時, 多個讀鎖持有者可以同時進行讀操作 
 *共享訊號量: 在分散式系統中的各個JVM使用同一個zk lock path,
 *      該path將跟一個給定數量的租約(lease)相關聯, 然後各個應用根據請求順序獲得對應的lease, 
 *      相對來說, 這是最公平的鎖服務使用方式. 
 *多共享鎖:內部構件多個共享鎖(會跟一個znode path關聯), 在acquire()過程中,
 *     執行所有共享鎖的acquire()方法, 如果中間出現一個失敗, 則將釋放所有已require的共享鎖; 
 *      執行release()方法時, 則執行內部多個共享鎖的release方法(如果出現失敗將忽略) 
 */
public class ZookeeperLockDemo {
 
    private static CuratorFramework client ;
    
    public static void main(String[] args) {
        
        //testSharedLock();
       // testReadWriterLock();
        for (int i = 0; i < 4; i++) {
            testSharedSemaphore(i);
        }
    }
    
    /*
     * 共享鎖測試
     */
    private static void testSharedLock(){
        Thread t1 = new Thread("t1"){
            public void run(){
                sharedLock();
            }
        };
        Thread t2 = new Thread("t2"){
            public void run(){
                sharedLock();
            }
        };
        t1.start();
        t2.start();
    }
    
    /**
     * 
     * @描述:第一種鎖: 共享鎖
     * @return void
     * @exception
     * @createTime:2016年5月19日
     * @author: songqinghu
     * @throws Exception 
     */
    private static void sharedLock(){
        CuratorFramework client = getClient();
        //這兩個都是共享鎖
       // new InterProcessMutex(client, path)
       // new InterProcessSemaphoreMutex(client, path)
        
        InterProcessMutex sharedLock = new InterProcessMutex(client, "/sharedlock");
        try {
            //鎖是否被獲取到
            //超時 不在進行操作
              if(sharedLock.acquire(50, TimeUnit.MILLISECONDS)){
                //sharedLock.acquire();
                System.out.println(Thread.currentThread().getName() + "  is  get the shared lock");
                Thread.sleep(100000);
                System.out.println(Thread.currentThread().getName() + "  is  release the shared lock");
            }
            
        } catch (Exception e) {
            //日誌記錄一下 超時說明 有鎖 可以不在操作
            
        }finally {
            try {
                System.out.println(Thread.currentThread().getName() + "  the flag is " + sharedLock.isAcquiredInThisProcess());
                if(sharedLock.isAcquiredInThisProcess())//判斷是否持有鎖 進而進行鎖是否釋放的操作
                sharedLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
    }
    /**
     * 
     * @描述:測試讀寫鎖
     * @return void
     * @exception
     * @createTime:2016年5月19日
     * @author: songqinghu
     */
    private static void testReadWriterLock(){
        //建立多執行緒 迴圈進行鎖的操作
        CuratorFramework client = getClient();
        
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
       final  InterProcessMutex readLock = readWriteLock.readLock();
       final  InterProcessMutex writeLock = readWriteLock.writeLock();
        
        List<Thread> jobs = new ArrayList<Thread>();
        
        for (int i = 0; i < 20; i++) {
            Thread t = new Thread("寫鎖  " + i){
                
                public void run(){
                    readWriterLock(writeLock);
                }
            };
            jobs.add(t);
        }
        
        for (int i = 0; i < 1; i++) {
            Thread t = new Thread("讀鎖  " + i){
                
                public void run(){
                    readWriterLock(readLock);
                }
            };
            jobs.add(t);
        }
        
        for (Thread thread : jobs) {
            thread.start();
        }
        
    }
    
    /**
     * 
     * @描述:讀寫鎖演示
     * @return void
     * @exception
     * @createTime:2016年5月19日
     * @author: songqinghu
     */
    private static void readWriterLock(InterProcessLock lock){
        System.out.println(Thread.currentThread().getName()+"   進入任務 " + System.currentTimeMillis());
        try {
            if(lock.acquire(20, TimeUnit.MILLISECONDS)){
                //System.err.println(Thread.currentThread().getName() + " 等待超時  無法獲取到鎖");
            //執行任務 --讀取 或者寫入
               int time = RandomUtils.nextInt(150);
               
               System.out.println(Thread.currentThread().getName()+"   執行任務開始");
               Thread.sleep(time);
              System.out.println(Thread.currentThread().getName()+"   執行任務完畢");
            }else{
              System.err.println(Thread.currentThread().getName() + " 等待超時  無法獲取到鎖"); 
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                if(lock.isAcquiredInThisProcess())
                    lock.release();
            } catch (Exception e2) {
            
            }
        }
        
    }
    
    
    private static void testSharedSemaphore(final int x){
        CuratorFramework client = getClient();
        final InterProcessSemaphoreV2 semaphoreV2 = 
               /// new InterProcessSemaphoreV2(client, "/sharedsemaphore", 50);
                new InterProcessSemaphoreV2(client, "/sharedsemaphore",
                        new SharedCount(client, "/semaphore", 2));
        List<Thread> jobs  = new ArrayList<Thread>();
        for (int i = 0; i < 2; i++) {
            Thread thread = new Thread(x +"  共享資訊鎖 " + i){
                public void run(){
                    sharedSemaphore(semaphoreV2);
                }
            };
            jobs.add(thread);
        }
        for (Thread thread : jobs) {
            thread.start();
        }
    }
    
    
    /**
     * 共享訊號量
     * 設定總的數量 -->分散式情況下的最大並行數量
     * 按照請求順序進行 執行權的分配
     * 可以設定超時 不執行 也可以設定 直到獲取執行權 執行
     */
    private static void sharedSemaphore(InterProcessSemaphoreV2 semaphoreV2){
        //CuratorFramework client = getClient();
       // new InterProcessSemaphoreV2(client, path, maxLeases)
       // new InterProcessSemaphoreV2(client, path, count)
        
        Lease lease = null;
        try {
           // lease = semaphoreV2.acquire(10, TimeUnit.MILLISECONDS);
            lease = semaphoreV2.acquire();
            if(lease != null){
                System.out.println(Thread.currentThread().getName()+"   執行任務開始" + System.currentTimeMillis());
                //Thread.sleep(RandomUtils.nextInt(4000));
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName()+"   執行任務完畢" + System.currentTimeMillis());
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(lease !=null)
                    semaphoreV2.returnLease(lease);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
            
            
        }
        
        
        
        
    }
    
    
    
    /**
     * 
     * @描述:獲取連線
     * @return void
     * @exception
     * @createTime:2016年5月19日
     * @author: songqinghu
     */
    private static CuratorFramework getClient(){
        if(client ==null){
            
            ACLProvider aclProvider = new ACLProvider() {
                private List<ACL> acl;
                //初始化操作
                private synchronized void init(){
                    if(acl ==null){
                        acl = new ArrayList<ACL>();
                        acl.add(new ACL(Perms.ALL, new Id("auth","admin:admin")));
                    }
                }
                @Override
                public List<ACL> getDefaultAcl() {
                    if(acl ==null){
                        init();
                    }
                    return this.acl;
                }
                @Override
                public List<ACL> getAclForPath(String path) {
                    if(acl ==null){
                        init();
                    }
                    return this.acl;
                }
            };
            String scheme = "digest";
            byte[] auth = "admin:admin".getBytes();
            int connectionTimeoutMs =5000;
            String connectString = "10.125.2.44:2181";
            byte[] defaultData ="預設資料".getBytes();
            int maxCloseWaitMs = 5000;
            String namespace = "testlock";
            RetryPolicy retryPolicy = new RetryNTimes(Integer.MAX_VALUE, 5000);
            CuratorFramework clientinit = CuratorFrameworkFactory.builder().aclProvider(aclProvider).authorization(scheme, auth)
                    .connectionTimeoutMs(connectionTimeoutMs).connectString(connectString).
                    defaultData(defaultData).maxCloseWaitMs(maxCloseWaitMs).namespace(namespace)
                    .retryPolicy(retryPolicy).build();
            
            clientinit.start(); 
            client = clientinit;
        }
        return client;
    }
    
    
    
    
    
    
}