1. 程式人生 > >springboot2.0.3整合zookeeper實現分散式鎖

springboot2.0.3整合zookeeper實現分散式鎖

1.zookeeper安裝

dataVersion 可以作為樂觀鎖

1.1 zookeeper基礎知識

(1) session基本原理

​ 客戶端與伺服器之間維持會話,每個會話都可以設定一個超時時間,心跳結束那麼會話就結束。心跳存在,那麼就是告訴伺服器別把我刪掉。

​ session過期之後,session建立的臨時節點會被拋棄。

心跳機制:客戶端向伺服器傳送Ping包請求。

(2)基本命令

create /aaa bbb 建立一個aaa節點 值為bbb

get /aaa 獲取節點

set /aaa bbb 1 設定aaa節點值為bbb,並且只有在版本為1的情況下生效,否則失敗

delete /aaa 注意若節點下沒有子節點才可以刪除。若存在子節點,那麼先刪除子節點。

(3)zk特性watcher機制

針對每個節點的操作,都會有一個監聽者watcher。

當監控的某個物件發生了變化(父節點或子節點增刪改操作),就會出發watcher機制。

針對不同型別的操作,觸發的watcher事件也不同。(建立,刪除,資料變化事件)

zk中的watcher是一次性的,觸發後就會被銷燬。

stat /aaa watch 監聽aaa

create /aaa 123 建立一個aaa節點

watcherEvent : node created path:/aaa

get /aaa watcher 設定 aaa監聽

set /aaa bbb

觸發watcherEvent nodeDataChanged path /aaa

delete /imooc 發現沒有watcher事件,則說明watcher是一次性的

get /aaa watcher 設定 aaa監聽 delete /imooc

觸發watcherEvent NodeDeleted

ls為父節點設定watcher,建立子節點觸發NodeChildrenChanged

ls為父節點設定watcher, 刪除子節點觸發NodeChildrenChanged

ls為父節點設定watcher, 修改子節點值不觸發事件,只有把子節點當父節點,修改資料得時候才發生nodeChanged事件

2. 分散式鎖依賴監聽:

(1)將zookeeper新增到springboot中

@Component("applicationContextHelper")
public class ApplicationContextHelper implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringHelper.setApplicationContext(applicationContext);
        InitData.start();
        new Thread(new RebuildCacheThread()).start();
        ZooKeeperSession.init();
    }
}

(2)分散式鎖

public class ZooKeeperSession {
	
	private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	private ZooKeeper zookeeper;

	public ZooKeeperSession() {
		// 去連線zookeeper server,建立會話的時候,是非同步去進行的
		// 所以要給一個監聽器,說告訴我們什麼時候才是真正完成了跟zk server的連線
		try {
			this.zookeeper = new ZooKeeper(
					"127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003", 
					50000, 
					new ZooKeeperWatcher());
			// 給一個狀態CONNECTING,連線中
			System.out.println(zookeeper.getState());
			
			try {
				// CountDownLatch
				// java多執行緒併發同步的一個工具類
				// 會傳遞進去一些數字,比如說1,2 ,3 都可以
				// 然後await(),如果數字不是0,那麼久卡住,等待
				
				// 其他的執行緒可以呼叫coutnDown(),減1
				// 如果數字減到0,那麼之前所有在await的執行緒,都會逃出阻塞的狀態
				// 繼續向下執行
				
				connectedSemaphore.await();
			} catch(InterruptedException e) {
				e.printStackTrace();
			}

			System.out.println("ZooKeeper session established......");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 獲取分散式鎖
	 * @param productId
	 */
	public void acquireDistributedLock(Long productId) {
		String path = "/product-lock-" + productId;
	
		try {
			zookeeper.create(path, "".getBytes(), 
					Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			System.out.println("success to acquire lock for product[id=" + productId + "]");  
		} catch (Exception e) {
			// 如果那個商品對應的鎖的node,已經存在了,就是已經被別人加鎖了,那麼就這裡就會報錯
			// NodeExistsException
			int count = 0;
			while(true) {
				try {
					Thread.sleep(1000); 
					zookeeper.create(path, "".getBytes(), 
							Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
				} catch (Exception e2) {
					count++;
					System.out.println("the " + count + " times try to acquire lock for product[id=" + productId + "]......");
					continue;
				}
				System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......");
				break;
			}
		}
	}
	
	/**
	 * 釋放掉一個分散式鎖
	 * @param productId
	 */
	public void releaseDistributedLock(Long productId) {
		String path = "/product-lock-" + productId;
		try {
			zookeeper.delete(path, -1); 
			System.out.println("release the lock for product[id=" + productId + "]......");  
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 建立zk session的watcher
	 * @author Administrator
	 *
	 */
	private class ZooKeeperWatcher implements Watcher {

		public void process(WatchedEvent event) {
			System.out.println("Receive watched event: " + event.getState());
			if(KeeperState.SyncConnected == event.getState()) {
				connectedSemaphore.countDown();
			} 
		}
		
	}
	
	/**
	 * 封裝單例的靜態內部類
	 * @author Administrator
	 *
	 */
	private static class Singleton {
		
		private static ZooKeeperSession instance;
		
		static {
			instance = new ZooKeeperSession();
		}
		
		public static ZooKeeperSession getInstance() {
			return instance;
		}
		
	}
	
	/**
	 * 獲取單例
	 * @return
	 */
	public static ZooKeeperSession getInstance() {
		return Singleton.getInstance();
	}
	
	/**
	 * 初始化單例的便捷方法
	 */
	public static void init() {
		getInstance();
	}
	
}

(3)以分散式環境下修改快取為例(這裡程式碼用到了2級快取:redis+ehcache)

public class RebuildCacheThread implements Runnable {
	
	public void run() {
		RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance();
		ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
		IProductService cacheService = (IProductService) SpringHelper.popBean(IProductService.class);
		
		while(true) {
			Product productInfo = rebuildCacheQueue.takeProductInfo();
			
			zkSession.acquireDistributedLock(productInfo.getId());  
			
			Product existedProductInfo = cacheService.getProductByRedisCache(productInfo.getId());
			
			if(existedProductInfo != null) {
				// 比較當前資料的時間版本比已有資料的時間版本是新還是舊
				try {
					if(productInfo.getUpdateTime().before(existedProductInfo.getUpdateTime())) {
						System.out.println("current date[" + productInfo.getUpdateTime() + "] is before existed date[" + existedProductInfo.getUpdateTime() + "]");
						continue;
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
				System.out.println("current date[" + productInfo.getUpdateTime() + "] is after existed date[" + existedProductInfo.getUpdateTime() + "]");
			} else {
				System.out.println("existed product info is null......");   
			}
			cacheService.saveProduct2RedisCache(productInfo);  
			zkSession.releaseDistributedLock(productInfo.getId()); 
		}
	}

}