1. 程式人生 > >C# 基於ZooKeeper實現分散式鎖

C# 基於ZooKeeper實現分散式鎖

主體思路

1. 在locks節點下建立臨時順序節點node_n
2. 判斷當前建立的節點是否為locks節點下所有子節點中最小的子節點
3. 是則獲取鎖,進行業務處理,否則將節點從小到大排序,監聽當前節點上一個節點的刪除事件
4. 事件觸發後回到步驟2進行判斷,直至拿到鎖

程式碼塊分析

建構函式中建立Zookeeper物件

1. 注意建立完物件之後不一定和伺服器建立了連線,中間非同步存在時間差,故增加了while迴圈等待

2. 建立連線之後建立鎖的根節點,注意該節點為持久化節點

public ZooKeeperLock(string server, string lockName)
 {
    this.lockName = lockName;
    this.zooKeeper = new ZooKeeper(server, new TimeSpan(0, 0, 0, SessionTimeout), this);
    while (!this.connected)
    {
      //保證和zookeeper建立連線後再進行節點操作
    }
    var stat = this.zooKeeper.Exists(LockRootName, false);
    if (stat == null)
    {
      this.zooKeeper.Create(LockRootName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
    }
}

事件監聽

實現IWatch介面,主要負責對節點變化進行事件監聽,在改例中監聽了兩個事件(有對ZooKeeper監聽流程不熟悉的請自行百度學習)

1. 監聽伺服器建立連線事件,建立連線後需要告知ZooKeeper物件連線完畢,設定connected變數為true,該變數在上面建構函式中使用到了

2. 監聽節點刪除事件,上一個節點刪除之後重新嘗試獲取鎖操作

public void Process(WatchedEvent @event)
        {
            if (KeeperState.SyncConnected == @event.State)
            {
                this.connected = true;
            }

            if (@event.Type == EventType.NodeDeleted)
            {
                this.GetLock(false);
            }
        }

 

獲取鎖方法

1. 節點不存在的時候需要建立一個鎖節點,該節點為臨時順序節點

2. 獲取鎖根節點下的所有子節點並進行由小到大排序,如果步驟1建立的節點是第一個節點則獲得鎖,返回true

3. 如果不是則找到上一個節點,監聽其刪除事件 (這樣所有鎖節點形成了一個鏈式的關係,避免了羊群效應)

public bool GetLock(bool create = true)
        {
            if (this.currentId == null && create)
            {
                this.currentId = this.zooKeeper.Create(LockRootName + "/" + this.lockName + "_", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
            }
            var childrens = this.zooKeeper.GetChildren(LockRootName, false);
            if (childrens == null || childrens.Count() == 1)
            {
                return true;
            }
            var orderChildrens = childrens.OrderBy(p => p).ToList();
            var index = orderChildrens.FindIndex(p => p.Equals(this.currentId.Replace(LockRootName + "/", "")));
            if (index == 0)
            {
                return true;
            }
            this.waitId = LockRootName + "/" + orderChildrens[index - 1];
            var stat = this.zooKeeper.Exists(this.waitId, this);
            if (stat == null)
            {
                this.GetLock(false);
            }
            return false;
        }

釋放鎖

鎖使用完成之後則將當前鎖節點進行刪除,釋放當前鎖

public void UnLock()
        {
            if (this.currentId == null)
            {
                return;
            }
            this.zooKeeper.Delete(this.currentId, -1);
            this.currentId = null;
        }

測試程式碼

for (int i = 0; i < 10; i++)
            {
                Task.Factory.StartNew(
                    () =>
                    {
                        var lockObj = new ZooKeeperLock("127.0.0.1:2181", "testlock");
                        bool outPut = false;
                        while (true)
                        {
                            if (lockObj.GetLock())
                            {
                                Console.WriteLine(lockObj.GetCurrentId() + "獲得鎖正在執行操作");
                                Thread.Sleep(5 * 1000);
                                Console.WriteLine(lockObj.GetCurrentId() + "執行操作完成,即將釋放鎖");
                                lockObj.UnLock();
                                lockObj.Dispose();
                                break;
                            }
                            else
                            {
                                if (!outPut)
                                {
                                    Console.WriteLine(lockObj.GetCurrentId() + "在等待鎖");
                                    outPut = true;
                                }

                            }
                        }
                    });

            }

執行結果 

完整程式碼:https://github.com/One-One-Code/ZooKeeper-