zookeeper入門之curator框架--幾種鎖的操作
阿新 • • 發佈:2018-11-24
package com.git.zookeeper.passwordmanager.lock; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.math.RandomUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2; import org.apache.curator.framework.recipes.locks.Lease; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; /** * zookeeper 鎖功能的演示 基於curator * * @author songqinghu * *共享鎖: 全域性同步分散式鎖, 同一時間兩臺機器只有一臺能獲得同一把鎖. *共享讀寫鎖: 用於分散式的讀寫互斥處理, 同時生成兩個鎖:一個讀鎖, 一個寫鎖, * 讀鎖能被多個應用持有, 而寫鎖只能一個獨佔, 當寫鎖未被持有時, 多個讀鎖持有者可以同時進行讀操作 *共享訊號量: 在分散式系統中的各個JVM使用同一個zk lock path, * 該path將跟一個給定數量的租約(lease)相關聯, 然後各個應用根據請求順序獲得對應的lease, * 相對來說, 這是最公平的鎖服務使用方式. *多共享鎖:內部構件多個共享鎖(會跟一個znode path關聯), 在acquire()過程中, * 執行所有共享鎖的acquire()方法, 如果中間出現一個失敗, 則將釋放所有已require的共享鎖; * 執行release()方法時, 則執行內部多個共享鎖的release方法(如果出現失敗將忽略) */ public class ZookeeperLockDemo { private static CuratorFramework client ; public static void main(String[] args) { //testSharedLock(); // testReadWriterLock(); for (int i = 0; i < 4; i++) { testSharedSemaphore(i); } } /* * 共享鎖測試 */ private static void testSharedLock(){ Thread t1 = new Thread("t1"){ public void run(){ sharedLock(); } }; Thread t2 = new Thread("t2"){ public void run(){ sharedLock(); } }; t1.start(); t2.start(); } /** * * @描述:第一種鎖: 共享鎖 * @return void * @exception * @createTime:2016年5月19日 * @author: songqinghu * @throws Exception */ private static void sharedLock(){ CuratorFramework client = getClient(); //這兩個都是共享鎖 // new InterProcessMutex(client, path) // new InterProcessSemaphoreMutex(client, path) InterProcessMutex sharedLock = new InterProcessMutex(client, "/sharedlock"); try { //鎖是否被獲取到 //超時 不在進行操作 if(sharedLock.acquire(50, TimeUnit.MILLISECONDS)){ //sharedLock.acquire(); System.out.println(Thread.currentThread().getName() + " is get the shared lock"); Thread.sleep(100000); System.out.println(Thread.currentThread().getName() + " is release the shared lock"); } } catch (Exception e) { //日誌記錄一下 超時說明 有鎖 可以不在操作 }finally { try { System.out.println(Thread.currentThread().getName() + " the flag is " + sharedLock.isAcquiredInThisProcess()); if(sharedLock.isAcquiredInThisProcess())//判斷是否持有鎖 進而進行鎖是否釋放的操作 sharedLock.release(); } catch (Exception e) { e.printStackTrace(); } } } /** * * @描述:測試讀寫鎖 * @return void * @exception * @createTime:2016年5月19日 * @author: songqinghu */ private static void testReadWriterLock(){ //建立多執行緒 迴圈進行鎖的操作 CuratorFramework client = getClient(); InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter"); final InterProcessMutex readLock = readWriteLock.readLock(); final InterProcessMutex writeLock = readWriteLock.writeLock(); List<Thread> jobs = new ArrayList<Thread>(); for (int i = 0; i < 20; i++) { Thread t = new Thread("寫鎖 " + i){ public void run(){ readWriterLock(writeLock); } }; jobs.add(t); } for (int i = 0; i < 1; i++) { Thread t = new Thread("讀鎖 " + i){ public void run(){ readWriterLock(readLock); } }; jobs.add(t); } for (Thread thread : jobs) { thread.start(); } } /** * * @描述:讀寫鎖演示 * @return void * @exception * @createTime:2016年5月19日 * @author: songqinghu */ private static void readWriterLock(InterProcessLock lock){ System.out.println(Thread.currentThread().getName()+" 進入任務 " + System.currentTimeMillis()); try { if(lock.acquire(20, TimeUnit.MILLISECONDS)){ //System.err.println(Thread.currentThread().getName() + " 等待超時 無法獲取到鎖"); //執行任務 --讀取 或者寫入 int time = RandomUtils.nextInt(150); System.out.println(Thread.currentThread().getName()+" 執行任務開始"); Thread.sleep(time); System.out.println(Thread.currentThread().getName()+" 執行任務完畢"); }else{ System.err.println(Thread.currentThread().getName() + " 等待超時 無法獲取到鎖"); } } catch (Exception e) { e.printStackTrace(); }finally{ try { if(lock.isAcquiredInThisProcess()) lock.release(); } catch (Exception e2) { } } } private static void testSharedSemaphore(final int x){ CuratorFramework client = getClient(); final InterProcessSemaphoreV2 semaphoreV2 = /// new InterProcessSemaphoreV2(client, "/sharedsemaphore", 50); new InterProcessSemaphoreV2(client, "/sharedsemaphore", new SharedCount(client, "/semaphore", 2)); List<Thread> jobs = new ArrayList<Thread>(); for (int i = 0; i < 2; i++) { Thread thread = new Thread(x +" 共享資訊鎖 " + i){ public void run(){ sharedSemaphore(semaphoreV2); } }; jobs.add(thread); } for (Thread thread : jobs) { thread.start(); } } /** * 共享訊號量 * 設定總的數量 -->分散式情況下的最大並行數量 * 按照請求順序進行 執行權的分配 * 可以設定超時 不執行 也可以設定 直到獲取執行權 執行 */ private static void sharedSemaphore(InterProcessSemaphoreV2 semaphoreV2){ //CuratorFramework client = getClient(); // new InterProcessSemaphoreV2(client, path, maxLeases) // new InterProcessSemaphoreV2(client, path, count) Lease lease = null; try { // lease = semaphoreV2.acquire(10, TimeUnit.MILLISECONDS); lease = semaphoreV2.acquire(); if(lease != null){ System.out.println(Thread.currentThread().getName()+" 執行任務開始" + System.currentTimeMillis()); //Thread.sleep(RandomUtils.nextInt(4000)); Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+" 執行任務完畢" + System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); }finally { try { if(lease !=null) semaphoreV2.returnLease(lease); } catch (Exception e2) { e2.printStackTrace(); } } } /** * * @描述:獲取連線 * @return void * @exception * @createTime:2016年5月19日 * @author: songqinghu */ private static CuratorFramework getClient(){ if(client ==null){ ACLProvider aclProvider = new ACLProvider() { private List<ACL> acl; //初始化操作 private synchronized void init(){ if(acl ==null){ acl = new ArrayList<ACL>(); acl.add(new ACL(Perms.ALL, new Id("auth","admin:admin"))); } } @Override public List<ACL> getDefaultAcl() { if(acl ==null){ init(); } return this.acl; } @Override public List<ACL> getAclForPath(String path) { if(acl ==null){ init(); } return this.acl; } }; String scheme = "digest"; byte[] auth = "admin:admin".getBytes(); int connectionTimeoutMs =5000; String connectString = "10.125.2.44:2181"; byte[] defaultData ="預設資料".getBytes(); int maxCloseWaitMs = 5000; String namespace = "testlock"; RetryPolicy retryPolicy = new RetryNTimes(Integer.MAX_VALUE, 5000); CuratorFramework clientinit = CuratorFrameworkFactory.builder().aclProvider(aclProvider).authorization(scheme, auth) .connectionTimeoutMs(connectionTimeoutMs).connectString(connectString). defaultData(defaultData).maxCloseWaitMs(maxCloseWaitMs).namespace(namespace) .retryPolicy(retryPolicy).build(); clientinit.start(); client = clientinit; } return client; } }