分散式鎖(一)__基於Zookeeper實現可重入分散式鎖
阿新 • • 發佈:2018-11-10
1 重入的實現
對於鎖的重入,我們來想這樣一個場景。當一個遞迴方法被sychronized關鍵字修飾時,在呼叫方法時顯然沒有發生問題,執行執行緒獲取了鎖之後仍能連續多次地獲得該鎖,也就是說sychronized關鍵字支援鎖的重入。對於ReentrantLock,雖然沒有像sychronized那樣隱式地支援重入,但在呼叫lock()方法時,已經獲取到鎖的執行緒,能夠再次呼叫lock()方法獲取鎖而不被阻塞。
如果想要實現鎖的重入,至少要解決一下兩個問題
- 執行緒再次獲取鎖:鎖需要去識別獲取鎖的執行緒是否為當前佔據鎖的執行緒,如果是,則再次成功獲取。
- 鎖的最終釋放
2、zookeeper分散式鎖的實現:
ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。它是一個為分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。
ZooKeeper的架構通過冗餘服務實現高可用性。因此,如果第一次無應答,客戶端就可以詢問另一臺ZooKeeper主機。ZooKeeper節點將它們的資料儲存於一個分層的名稱空間,非常類似於一個檔案系統或一個字首樹結構。客戶端可以在節點讀寫,從而以這種方式擁有一個共享的配置服務。更新是全序的。
基於ZooKeeper分散式鎖的流程
- 在zookeeper指定節點(locks)下建立臨時順序節點node_n
- 獲取locks下所有子節點children
- 對子節點按節點自增序號從小到大排序
- 判斷本節點是不是第一個子節點,若是,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件
- 若監聽事件生效,則回到第二步重新進行判斷,直到獲取到鎖
引入依賴:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
抽象的OrderService介面:
public interface OrderService {
void createOrder();
}
訂單號生成類(模擬公共資源ps:需要用鎖的地方):
package com.th.order;
import java.text.SimpleDateFormat;
import java.util.Date;
public class OrderCodeGenerator {
private static int i = 0;
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
return sdf.format(now) + ++i;
}
}
MyZkSerializer 繼承了ZkSerializer:
package com.th.order;
import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
public class MyZkSerializer implements ZkSerializer {
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError(e);
}
}
@Override
public byte[] serialize(Object obj) throws ZkMarshallingError {
try {
return String.valueOf(obj).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError(e);
}
}
}
自己實現的可重入鎖ZookeeperReAbleDisLock:
package com.th.order;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class ZookeeperReAbleDisLock implements Lock {
private ZkClient client;
private String lockPath;
private String currentPath;
private String beforePath;
// 執行緒獲取鎖的次數
private static volatile int state;
// 當前獲取鎖的執行緒
private static volatile Thread thread;
public ZookeeperReAbleDisLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("192.168.1.117:2181");
client.setZkSerializer(new MyZkSerializer());
if (!client.exists(lockPath)) {
try {
client.createPersistent(lockPath);
} catch (Exception e) {
}
}
}
@Override
public boolean tryLock() {
return tryLock(1);
}
public boolean tryLock(int acquires) {
// 獲取當前執行緒
final Thread currntThread = Thread.currentThread();
// 獲取當前鎖的次數
int state = getState();
// state == 0 表示沒有執行緒獲取鎖 進來的執行緒肯定能獲取鎖
if (state == 0) {
if (compareAndSetState(0, acquires, currntThread)) {
return true;
}
// state != 0 表示執行緒被獲取了 判斷是否是當前執行緒 如果是則 state+1 ,否則返回false
} else if (currntThread == getThread()) {
int nextS = getState() + acquires;
if (nextS < 0)
throw new Error("Maximum lock count exceeded");
// 這裡不需要cas 原因不解釋
setState(nextS);
System.out.println(Thread.currentThread().getName() + ":獲取重入鎖成功,當前獲取鎖次數: " + getState());
return true;
}
// 獲取鎖的不是當前執行緒
return false;
}
public boolean compareAndSetState(int expect, int update, Thread t) {
if (this.currentPath == null) {
currentPath = this.client.createEphemeralSequential(lockPath + "/", "1");
}
// 獲取所有節點
List<String> children = this.client.getChildren(lockPath);
// 排序
Collections.sort(children);
// 判斷當前節點是否為最小節點
if (getState() == expect && currentPath.equals(lockPath + "/" + children.get(0))) {
setState(update);
thread = t;
System.out.println(Thread.currentThread().getName() + ":獲取鎖成功,當前獲取鎖次數: " + getState());
return true;
}
// 取得前一個
// 得到位元組的索引號
int curIndex = children.indexOf(currentPath.substring(lockPath.length() + 1));
beforePath = lockPath + "/" + children.get(curIndex - 1);
return false;
}
public final boolean tryRelease(int releases) {
// 可以判斷是否自己獲得鎖 自己獲得鎖才能刪除
final Thread currentThread = Thread.currentThread();
// 獲取鎖的不是當前執行緒
if (currentThread != getThread()) {
// throw new IllegalMonitorStateException();
return false;
}
// 釋放鎖的次數
int nextS = getState() - releases;
boolean free = false;
if (nextS == 0) {
free = true;
setThread(null);
// 刪除zk節點
client.delete(currentPath);
System.out.println(Thread.currentThread().getName() + ": 所有鎖釋放成功:刪除zk節點...");
}
setState(nextS);
if (!free)
System.out.println(Thread.currentThread().getName() + ": 釋放重入鎖成功: 剩餘鎖次數:" + getState());
return free;
}
@Override
public void lock() {
if (!tryLock()) {
// 沒有獲得鎖,阻塞自己
waitForLock();
// 再次嘗試加鎖
lock();
}
}
private void waitForLock() {
CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
System.out.println("節點被刪除了,開始搶鎖...");
cdl.countDown();
}
};
// 完成watcher註冊
this.client.subscribeDataChanges(beforePath, listener);
// 阻塞自己
if (this.client.exists(beforePath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消註冊
this.client.unsubscribeDataChanges(beforePath, listener);
}
@Override
public void unlock() {
// 可以判斷是否自己獲得鎖 自己獲得鎖才能刪除
// client.delete(currentPath);
tryRelease(1);
}
public static int getState() {
return state;
}
public static void setState(int state) {
ZookeeperReAbleDisLock.state = state;
}
public static Thread getThread() {
return thread;
}
public static void setThread(Thread thread) {
ZookeeperReAbleDisLock.thread = thread;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
OrderServiceImplWithZkDis 實現了 OrderServiceI介面,並且測試方法也寫在了裡面:
package com.th.order;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;
public class OrderServiceImplWithZkDis implements OrderService {
private static OrderCodeGenerator org = new OrderCodeGenerator();
//private Lock lock = new ZookeeperDisLock("/LOCK_TEST");
private Lock lock = new ZookeeperReAbleDisLock("/LOCK_TEST");
@Override
public void createOrder() {
String orderCode = null;
try {
lock.lock();
orderCode = org.getOrderCode();
TestReLock();
System.out.println(Thread.currentThread().getName() + "生成訂單:" + orderCode);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void TestReLock() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "測試重入鎖成功...");
lock.unlock();
}
public static void main(String[] args) {
int num = 20;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
for (int i = 0; i < num; i++) {
new Thread(new Runnable() {
@Override
public void run() {
OrderService orderService = new OrderServiceImplWithZkDis();
System.out.println(Thread.currentThread().getName() + ": 我準備好了");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
orderService.createOrder();
}
}).start();
}
}
}
測試結果: