1. 程式人生 > >zookeeper 實現分散式鎖

zookeeper 實現分散式鎖

 實現互斥鎖

package com.zookeeper.lock;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException; public class BaseDistributedLock { private final ZkClient client; private final String path; // zookeeper中locker節點的路徑 private final String basePath; private final String lockName; private static final Integer MAX_RETRY_COUNT = 10;
public BaseDistributedLock(ZkClient client, String path, String lockName) { this.client = client; this.basePath = path; this.path = path.concat("/").concat(lockName); this.lockName = lockName; } private void deleteOurPath(String ourPath) throws Exception { client.delete(ourPath); }
private String createLockNode(ZkClient client, String path) throws Exception { return client.createEphemeralSequential(path, null); } private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { while (!haveTheLock) { // 獲取lock節點下的所有節點 List children = getSortedChildren(); String sequenceNodeName = ourPath .substring(basePath.length() + 1); // 獲取當前節點的在所有節點列表中的位置 int ourIndex = children.indexOf(sequenceNodeName); // 節點位置小於0,說明沒有找到節點 if (ourIndex < 0) { throw new ZkNoNodeException("節點沒有找到: " + sequenceNodeName); } // 節點位置大於0說明還有其他節點在當前的節點前面,就需要等待其他的節點都釋放 boolean isGetTheLock = ourIndex == 0; String pathToWatch = (String) (isGetTheLock ? null : children .get(ourIndex - 1)); if (isGetTheLock) { haveTheLock = true; } else { String previousSequencePath = basePath.concat("/").concat( pathToWatch); final CountDownLatch latch = new CountDownLatch(1); final IZkDataListener previousListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { latch.countDown(); } public void handleDataChange(String dataPath, Object data) throws Exception { // ignore } }; try { // 如果節點不存在會出現異常 client.subscribeDataChanges(previousSequencePath, previousListener); if (millisToWait != null) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait <= 0) { doDelete = true; // timed out - delete our node break; } latch.await(millisToWait, TimeUnit.MICROSECONDS); } else { latch.await(); } } catch (ZkNoNodeException e) { // ignore } finally { client.unsubscribeDataChanges(previousSequencePath, previousListener); } } } } catch (Exception e) { // 發生異常需要刪除節點 doDelete = true; throw e; } finally { // 如果需要刪除節點 if (doDelete) { deleteOurPath(ourPath); } } return haveTheLock; } private String getLockNodeNumber(String str, String lockName) { int index = str.lastIndexOf(lockName); if (index >= 0) { index += lockName.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } List<String> getSortedChildren() throws Exception { try { List<String> children = client.getChildren(basePath); Collections.sort(children, new Comparator<String>() { public int compare(String lhs, String rhs) { return getLockNodeNumber(lhs, lockName).compareTo( getLockNodeNumber(rhs, lockName)); } }); return children; } catch (ZkNoNodeException e) { client.createPersistent(basePath, true); return getSortedChildren(); } } protected void releaseLock(String lockPath) throws Exception { deleteOurPath(lockPath); } protected String attemptLock(long time, TimeUnit unit) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; int retryCount = 0; // 網路閃斷需要重試一試 while (!isDone) { isDone = true; try { ourPath = createLockNode(client, path); hasTheLock = waitToLock(startMillis, millisToWait, ourPath); } catch (ZkNoNodeException e) { if (retryCount++ < MAX_RETRY_COUNT) { isDone = false; } else { throw e; } } } if (hasTheLock) { return ourPath; } return null; } }
View Code

介面類

package com.zookeeper.lock;

import java.util.concurrent.TimeUnit;

public interface DistributedLock {

    /** 獲取鎖,如果沒有得到就等待 */
    public void acquire() throws Exception;

    /**
     * 獲取鎖,直到超時
     * 
     * @param time
     *            超時時間
     * @param unit
     *            引數的單位
     * @return 是否獲取到鎖
     * @throws Exception
     */
    public boolean acquire(long time, TimeUnit unit) throws Exception;

    /**
     * 釋放鎖
     * 
     * @throws Exception
     */
    public void release() throws Exception;

}
View Code

測試類

package com.zookeeper.lock;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;

import coom.zookeeperdemo.lock.SimpleDistributedLockMutex;

public class TestDistributedLock {

    public static void main(String[] args) {

        final ZkClient zkClientExt1 = new ZkClient("192.168.1.105:2181",
                5000, 5000, new BytesPushThroughSerializer());
        final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(
                zkClientExt1, "/Mutex");

        final ZkClient zkClientExt2 = new ZkClient("192.168.1.105:2181",
                5000, 5000, new BytesPushThroughSerializer());
        final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(
                zkClientExt2, "/Mutex");

        try {
            mutex1.acquire();
            System.out.println("Client1 locked");
            Thread client2Thd = new Thread(new Runnable() {

                public void run() {
                    try {
                        mutex2.acquire();
                        System.out.println("Client2 locked");
                        mutex2.release();
                        System.out.println("Client2 released lock");

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            client2Thd.start();
            Thread.sleep(5000);
            mutex1.release();
            System.out.println("Client1 released lock");

            client2Thd.join();

        } catch (Exception e) {

            e.printStackTrace();
        }

    }

}
View Code

 

原文不知道地址了