1. 程式人生 > >分散式鎖(一)__基於Zookeeper實現可重入分散式鎖

分散式鎖(一)__基於Zookeeper實現可重入分散式鎖

1  重入的實現

對於鎖的重入,我們來想這樣一個場景。當一個遞迴方法被sychronized關鍵字修飾時,在呼叫方法時顯然沒有發生問題,執行執行緒獲取了鎖之後仍能連續多次地獲得該鎖,也就是說sychronized關鍵字支援鎖的重入。對於ReentrantLock,雖然沒有像sychronized那樣隱式地支援重入,但在呼叫lock()方法時,已經獲取到鎖的執行緒,能夠再次呼叫lock()方法獲取鎖而不被阻塞。

如果想要實現鎖的重入,至少要解決一下兩個問題

  • 執行緒再次獲取鎖:鎖需要去識別獲取鎖的執行緒是否為當前佔據鎖的執行緒,如果是,則再次成功獲取。
  • 鎖的最終釋放
    :執行緒重複n次獲取了鎖,隨後在n次釋放該鎖後,其他執行緒能夠獲取該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重複獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經釋放

2、zookeeper分散式鎖的實現:

ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。它是一個為分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。

ZooKeeper的架構通過冗餘服務實現高可用性。因此,如果第一次無應答,客戶端就可以詢問另一臺ZooKeeper主機。ZooKeeper節點將它們的資料儲存於一個分層的名稱空間,非常類似於一個檔案系統或一個字首樹結構。客戶端可以在節點讀寫,從而以這種方式擁有一個共享的配置服務。更新是全序的。

基於ZooKeeper分散式鎖的流程

  • 在zookeeper指定節點(locks)下建立臨時順序節點node_n
  • 獲取locks下所有子節點children
  • 對子節點按節點自增序號從小到大排序
  • 判斷本節點是不是第一個子節點,若是,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件
  • 若監聽事件生效,則回到第二步重新進行判斷,直到獲取到鎖

引入依賴:

<dependency>
	<groupId>com.101tec</groupId>
	<artifactId>zkclient</artifactId>
	<version>0.10</version>
</dependency>

抽象的OrderService介面:

public interface OrderService {
    void createOrder();
}

訂單號生成類(模擬公共資源ps:需要用鎖的地方):

package com.th.order;
import java.text.SimpleDateFormat;
import java.util.Date;

public class OrderCodeGenerator {
    private static int i = 0;

	public String getOrderCode() {
		Date now = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
		return sdf.format(now) + ++i;
	}

}

MyZkSerializer 繼承了ZkSerializer:

package com.th.order;

import java.io.UnsupportedEncodingException;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

public class MyZkSerializer implements ZkSerializer {

	@Override
	public Object deserialize(byte[] bytes) throws ZkMarshallingError {
		try {
			return new String(bytes, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
			throw new ZkMarshallingError(e);
		}
	}

	@Override
	public byte[] serialize(Object obj) throws ZkMarshallingError {
		try {
			return String.valueOf(obj).getBytes("UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
			throw new ZkMarshallingError(e);
		}
	}

}

自己實現的可重入鎖ZookeeperReAbleDisLock:

package com.th.order;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

public class ZookeeperReAbleDisLock implements Lock {

	private ZkClient client;

	private String lockPath;

	private String currentPath;

	private String beforePath;

	// 執行緒獲取鎖的次數
	private static volatile int state;
	// 當前獲取鎖的執行緒
	private static volatile Thread thread;
	
	public ZookeeperReAbleDisLock(String lockPath) {
		super();
		this.lockPath = lockPath;

		client = new ZkClient("192.168.1.117:2181");
		client.setZkSerializer(new MyZkSerializer());

		if (!client.exists(lockPath)) {
			try {
				client.createPersistent(lockPath);
			} catch (Exception e) {
			}
		}

	}

	@Override
	public boolean tryLock() {
		return tryLock(1);
	}

	public boolean tryLock(int acquires) {
		// 獲取當前執行緒
		final Thread currntThread = Thread.currentThread();
		// 獲取當前鎖的次數
		int state = getState();

		// state == 0 表示沒有執行緒獲取鎖 進來的執行緒肯定能獲取鎖
		if (state == 0) {
			if (compareAndSetState(0, acquires, currntThread)) {
				return true;
			}

			// state != 0 表示執行緒被獲取了 判斷是否是當前執行緒 如果是則 state+1 ,否則返回false
		} else if (currntThread == getThread()) {
			int nextS = getState() + acquires;

			if (nextS < 0)
				throw new Error("Maximum lock count exceeded");
			// 這裡不需要cas 原因不解釋
			setState(nextS);
			System.out.println(Thread.currentThread().getName() + ":獲取重入鎖成功,當前獲取鎖次數: " + getState());
			return true;
		}

		// 獲取鎖的不是當前執行緒
		return false;
	}

	public boolean compareAndSetState(int expect, int update, Thread t) {

		if (this.currentPath == null) {
			currentPath = this.client.createEphemeralSequential(lockPath + "/", "1");
		}

		// 獲取所有節點
		List<String> children = this.client.getChildren(lockPath);

		// 排序
		Collections.sort(children);

		// 判斷當前節點是否為最小節點
		if (getState() == expect && currentPath.equals(lockPath + "/" + children.get(0))) {
			setState(update);
			thread = t;
			System.out.println(Thread.currentThread().getName() + ":獲取鎖成功,當前獲取鎖次數: " + getState());
			return true;
		}

		// 取得前一個
		// 得到位元組的索引號
		int curIndex = children.indexOf(currentPath.substring(lockPath.length() + 1));
		beforePath = lockPath + "/" + children.get(curIndex - 1);

		return false;
	}

	public final boolean tryRelease(int releases) {
		// 可以判斷是否自己獲得鎖 自己獲得鎖才能刪除
		final Thread currentThread = Thread.currentThread();
		// 獲取鎖的不是當前執行緒
		if (currentThread != getThread()) {
			// throw new IllegalMonitorStateException();
			return false;
		}

		// 釋放鎖的次數
		int nextS = getState() - releases;
		boolean free = false;
		if (nextS == 0) {
			free = true;
			setThread(null);
			// 刪除zk節點
			client.delete(currentPath);
			System.out.println(Thread.currentThread().getName() + ": 所有鎖釋放成功:刪除zk節點...");
		}

		setState(nextS);

		if (!free)
			System.out.println(Thread.currentThread().getName() + ": 釋放重入鎖成功: 剩餘鎖次數:" + getState());

		return free;
	}

	@Override
	public void lock() {
		if (!tryLock()) {
			// 沒有獲得鎖,阻塞自己
			waitForLock();
			// 再次嘗試加鎖
			lock();
		}
	}

	private void waitForLock() {
		CountDownLatch cdl = new CountDownLatch(1);

		IZkDataListener listener = new IZkDataListener() {

			@Override
			public void handleDataChange(String arg0, Object arg1) throws Exception {

			}

			@Override
			public void handleDataDeleted(String arg0) throws Exception {
				System.out.println("節點被刪除了,開始搶鎖...");
				cdl.countDown();
			}

		};
		// 完成watcher註冊
		this.client.subscribeDataChanges(beforePath, listener);

		// 阻塞自己
		if (this.client.exists(beforePath)) {
			try {
				cdl.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		// 取消註冊
		this.client.unsubscribeDataChanges(beforePath, listener);
	}

	@Override
	public void unlock() {
		// 可以判斷是否自己獲得鎖 自己獲得鎖才能刪除
		// client.delete(currentPath);
		tryRelease(1);

	}

	public static int getState() {
		return state;
	}

	public static void setState(int state) {
		ZookeeperReAbleDisLock.state = state;
	}

	public static Thread getThread() {
		return thread;
	}

	public static void setThread(Thread thread) {
		ZookeeperReAbleDisLock.thread = thread;
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public Condition newCondition() {
		// TODO Auto-generated method stub
		return null;
	}

}

OrderServiceImplWithZkDis 實現了 OrderServiceI介面,並且測試方法也寫在了裡面:

package com.th.order;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;

public class OrderServiceImplWithZkDis implements OrderService {

	private static OrderCodeGenerator org = new OrderCodeGenerator();

	//private Lock lock = new ZookeeperDisLock("/LOCK_TEST");

	private Lock lock = new ZookeeperReAbleDisLock("/LOCK_TEST");

	@Override
	public void createOrder() {
		String orderCode = null;

		try {
			lock.lock();

			orderCode = org.getOrderCode();
			TestReLock();

			System.out.println(Thread.currentThread().getName() + "生成訂單:" + orderCode);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}

	}

	public void TestReLock() {
		lock.lock();
		System.out.println(Thread.currentThread().getName() + "測試重入鎖成功...");
		lock.unlock();

	}

	public static void main(String[] args) {
		int num = 20;
		CyclicBarrier cyclicBarrier = new CyclicBarrier(num);

		for (int i = 0; i < num; i++) {
			new Thread(new Runnable() {

				@Override
				public void run() {
					OrderService orderService = new OrderServiceImplWithZkDis();
					System.out.println(Thread.currentThread().getName() + ": 我準備好了");

					try {
						cyclicBarrier.await();
					} catch (Exception e) {
						e.printStackTrace();
					}

					orderService.createOrder();
				}
			}).start();
		}

	}

}

測試結果: