1. 程式人生 > >zookeeper 分布式鎖

zookeeper 分布式鎖

zookeeper 分布式鎖


分布式鎖有很多,redis也可以實現分布式鎖,

http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式鎖)


zookeeper分布式鎖步驟:

1、zookeeper是一個帶有節點的,類似於文件目錄,所以我們把鎖抽象成目錄,zookeeper有一個EPHEMERAL_SEQUENTIAL類型的節點, 多個線程再zookeeper創建的節點的時候,它會幫我們安排好順序進行創建,所以這個節點下的目錄都是順序的。

2、獲取當前目錄的最小的節點,判斷最小節點是不是當前的自己的節點,如果是說明獲取鎖成功了,如果不是獲取鎖失敗了。

3、當獲取鎖的時候失敗了,為了避免驚群效應,你要做的就是獲取當前自己的節點的上一個節點,然後對該節點進行監聽,當上一個節點刪除的時候,會觸發這個監聽,通知該節點。

4、這麽做,釋放鎖的時候,也會通知下一個節點。


什麽是驚群效應:理解為肉少狼多,當一個節點刪除的時候,凡是訂閱了此節點的watcha的監聽都會重新獲取鎖,都要去爭奪,如果數量少還好,當數量很大的時候這種設計就是不合理也是浪費資源。


zookeeper的狀態和事件類型,提前了解一下。

狀態 KeeperState.Disconnected (0)  斷開
 * KeeperState.SyncConnected (3)  同步連接狀態
 * KeeperState.AuthFailed (4) 認證失敗狀態
 * KeeperState.ConnectedReadOnly (5)  只讀連接狀態
 * KeeperState.SaslAuthenticated (6) SASL認證通過狀態
 * KeeperState.Expired (-112)  過期狀態
 * 
 * // EventType 是事件類型 主要關註 Create Delete DataChanged ChildrenChanged
 * EventType.None (-1), 無
 * EventType.NodeCreated (1),
 * EventType.NodeDeleted (2),
 * EventType.NodeDataChanged (3),  結點數據變化
 * EventType.NodeChildrenChanged (4); 結點子節點變化


下面是代碼,自己敲下,理解一下。

package com.lhcis.spider.system.annotation;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author sdc
 *
 */
public class ZooDistributeLock implements Watcher {

	private static final Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.class);

	private static final String LOCK_PATH = "/zkLock";

	// 模擬開啟的線程數
	private static final int THREAD_NUM = 5;

	// 用於等待所有線程都連接成功後再執行任務
	private static CountDownLatch startFlag = new CountDownLatch(1);

	// 用於確保所有線程執行完畢
	private static CountDownLatch threadFlag = new CountDownLatch(THREAD_NUM);

	private ZooKeeper zk = null;

	private String currentPath;

	private String lockPath;

	public static void main(String[] args) {
		for (int i = 0; i < THREAD_NUM; i++) {
			final int j = i;
			new Thread() {
				@Override
				public void run() {
					ZooDistributeLock zooDistributeLock = new ZooDistributeLock();
					try {
						zooDistributeLock.connection();
						System.out.println("連接" + j);
						zooDistributeLock.createNode();
						System.out.println("創建" + j);
						zooDistributeLock.getLock();
						System.out.println("獲取鎖" + j);
					} catch (IOException | InterruptedException | KeeperException e) {
						e.printStackTrace();
					}
				}
			}.start();
		}
		try {
			threadFlag.await();
			LOG.info("所有線程執行完畢...");
		} catch (InterruptedException e) {
			LOG.error(e.getMessage(), e);
		}
	}

	/**
	 * Disconnected為網絡閃斷時觸發的事件,當然其他的拔掉網線、kill zookeeper server ,kill zk
	 * connection也會觸發該事件。 SyncConnected為client端重新選擇下一個zk
	 * server連接觸發的事件,此時watcher有效,也就是能正常感知
	 * Expired為客戶端重新連server時,服務端發現該session超過了設定的時長,返回給client
	 * Expired,此時watcher失效,也就是不能正常感知
	 */
	@Override
	public void process(WatchedEvent event) {

		Event.KeeperState state = event.getState();
		Event.EventType type = event.getType();

		if (Event.KeeperState.SyncConnected == state) {
			if (Event.EventType.None == type) {
				// 標識連接成功
				LOG.info("成功連接上ZK服務器");
				startFlag.countDown();
			}

			if (Event.EventType.NodeDeleted == type && event.getPath().equals(this.lockPath)) {
				LOG.info("node:" + this.lockPath + "的鎖已經被釋放");
				try {
					// 上一個節點釋放了,當前節點去獲取鎖
					getLock();
				} catch (KeeperException | InterruptedException e) {
					LOG.error(e.getMessage(), e);
				}
			}
		}

	}

	/**
	 * 連接到 ZK
	 *
	 * @throws IOException
	 */
	private void connection() throws IOException, InterruptedException {

		zk = new ZooKeeper("127.0.0.1:2181", 5000, this);

		// 等待連接成功後再執行下一步操作
		startFlag.await();
	}

	// 創建節點,並初始化當前路徑
	private void createNode() throws KeeperException, InterruptedException, UnsupportedEncodingException {
		this.currentPath = this.zk.create(LOCK_PATH, "".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
	}

	private void getLock() throws KeeperException, InterruptedException {
		if (minNode()) {
			doSomething();
			// 釋放鎖
			releaseLock();
		}
	}

	/**
	 * 當前是否為最小節點
	 *
	 * @return
	 */
	private boolean minNode() {

		// 當前序號
		try {
			initLockPath();
			// 判斷前一個節點存在不存在,如果存在,則表示當前節點不是最小節點
			// zk.getData(this.lockPath, this, new Stat());
			zk.getData(this.lockPath, true, new Stat());
			LOG.info(this.currentPath + " 不是最小值,沒有獲取鎖,等待 " + this.lockPath + " 釋放鎖");
			return false;
		} catch (KeeperException e) {
			LOG.info(this.currentPath + " 是最小值,獲得鎖");
			return true;
		} catch (InterruptedException e) {
			LOG.error(e.getMessage(), e);
		}
		return true;
	}

	private void doSomething() {
		LOG.info("處理業務邏輯...");
	}

	/**
	 * 釋放鎖並關閉連接
	 *
	 * @throws KeeperException
	 * @throws InterruptedException
	 */
	private void releaseLock() throws KeeperException, InterruptedException {
		Thread.sleep(2000);
		if (this.zk != null) {
			LOG.info(this.currentPath + " 業務處理完畢,釋放鎖...");
			zk.delete(this.currentPath, -1);
			this.zk.close();
			LOG.info(Thread.currentThread().getName() + "關閉 zookeeper 連接");
		}
		threadFlag.countDown();
	}

	/**
	 * 初始化 lockpath
	 */
	private void initLockPath() {

		int currentSeq = Integer.parseInt(this.currentPath.substring(LOCK_PATH.length()));

		// 上一個序號
		int preSeq = currentSeq - 1;

		String preSeqStr = String.valueOf(preSeq);
		while (preSeqStr.length() < 10) {
			preSeqStr = "0" + preSeqStr;
		}
		this.lockPath = LOCK_PATH + preSeqStr;
	}

}



參考代碼:

https://juejin.im/entry/596438bc6fb9a06bb47495f1


本文出自 “不積跬步無以至千裏” 博客,請務必保留此出處http://shangdc.blog.51cto.com/10093778/1958619

zookeeper 分布式鎖