1. 程式人生 > >跟著例項學習ZooKeeper的用法: 分散式鎖

跟著例項學習ZooKeeper的用法: 分散式鎖

分散式的鎖全域性同步, 這意味著任何一個時間點不會有兩個客戶端都擁有相同的鎖。

可重入鎖Shared Reentrant Lock

首先我們先看一個全域性可重入的鎖。 Shared意味著鎖是全域性可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex來實現。 它的建構函式為:

public InterProcessMutex(CuratorFramework client, String path)

通過acquire獲得鎖,並提供超時機制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,
                       TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can
call acquire re-entrantly. Each call to acquire that returns true
must be balanced by a call to release() Parameters: time - time to wait unit - time unit Returns: true if the mutex was acquired, false if not

通過release()方法釋放鎖。 InterProcessMutex 例項可以重用。

Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。 為了撤銷mutex, 呼叫下面的方法:

public void makeRevocable(RevocationListener<T> listener)
將鎖設為可撤銷的. 當別的程序或執行緒想讓你釋放鎖是Listener會被呼叫。 Parameters: listener - the listener

如果你請求撤銷當前的鎖, 呼叫Revoker方法。

public static void attemptRevoke(CuratorFramework client,
                                 String path)
                         throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

錯誤處理 還是強烈推薦你使用ConnectionStateListener處理連線狀態的改變。 當連線LOST時你不再擁有鎖。

首先讓我們建立一個模擬的共享資源, 這個資源期望只能單執行緒的訪問,否則會有併發問題。

package com.colobu.zkrecipe.lock;

import java.util.concurrent.atomic.AtomicBoolean;

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真實環境中我們會在這裡訪問/維護一個共享的資源
        //這個例子在使用鎖的情況下不會非法併發異常IllegalStateException
        //但是在無鎖的情況由於sleep了一段時間,很容易丟擲異常
        if (!inUse.compareAndSet(false, true)) { 
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

然後建立一個ExampleClientThatLocks類, 它負責請求鎖, 使用資源,釋放鎖這樣一個完整的訪問過程。

package com.colobu.zkrecipe.lock;

import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

public class ExampleClientThatLocks {
    private final InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " has the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }
}

最後建立主程式來測試。

package com.colobu.zkrecipe.lock;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class InterProcessMutexExample {
    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

程式碼也很簡單,生成10個client, 每個client重複執行10次 請求鎖–訪問資源–釋放鎖的過程。每個client都在獨立的執行緒中。 結果可以看到,鎖是隨機的被每個例項排他性的使用。

既然是可重用的,你可以在一個執行緒中多次呼叫acquire,線上程擁有鎖時它總是返回true。

你不應該在多個執行緒中用同一個InterProcessMutex, 你可以在每個執行緒中都生成一個InterProcessMutex例項,它們的path都一樣,這樣它們可以共享同一個鎖。

不可重入鎖Shared Lock

這個鎖和上面的相比,就是少了Reentrant的功能,也就意味著它不能在同一個執行緒中重入。 這個類是InterProcessSemaphoreMutex。 使用方法和上面的類類似。

首先我們將上面的例子修改一下,測試一下它的重入。 修改ExampleClientThatLocks.doWork,連續兩次acquire:

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        System.out.println(clientName + " has the lock");
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        System.out.println(clientName + " has the lock again");

        try {            
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // always release the lock in a finally block
        }
    }

注意我們也需要呼叫release兩次。這和JDK的ReentrantLock用法一致。如果少呼叫一次release,則此執行緒依然擁有鎖。 上面的程式碼沒有問題,我們可以多次呼叫acquire,後續的acquire也不會阻塞。 將上面的InterProcessMutex換成不可重入鎖InterProcessSemaphoreMutex,如果再執行上面的程式碼,結果就會發現執行緒被阻塞再第二個acquire上。 也就是此鎖不是可重入的。

可重入讀寫鎖Shared Reentrant Read Write Lock

類似JDK的ReentrantReadWriteLock. 一個讀寫鎖管理一對相關的鎖。 一個負責讀操作,另外一個負責寫操作。 讀操作在寫鎖沒被使用時可同時由多個程序使用,而寫鎖使用時不允許讀 (阻塞)。 此鎖是可重入的。一個擁有寫鎖的執行緒可重入讀鎖,但是讀鎖卻不能進入寫鎖。 這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 —>讀鎖 —->釋放寫鎖。 從讀鎖升級成寫鎖是不成的。

主要由兩個類實現:

  • InterProcessReadWriteLock
  • InterProcessLock

使用時首先建立一個InterProcessReadWriteLock例項,然後再根據你的需求得到讀鎖或者寫鎖, 讀寫鎖的型別是InterProcessLock

public InterProcessLock readLock()
public InterProcessLock writeLock()

例子和上面的類似。

package com.colobu.zkrecipe.lock;

import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;

public class ExampleClientReadWriteLocks {
    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the writeLock");
        }
        System.out.println(clientName + " has the writeLock");

        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the readLock");
        }
        System.out.println(clientName + " has the readLock too");

        try {            
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            readLock.release(); // always release the lock in a finally block
            writeLock.release(); // always release the lock in a finally block
        }
    }
}

在這個類中我們首先請求了一個寫鎖, 然後降級成讀鎖。 執行業務處理,然後釋放讀寫鎖。

訊號量Shared Semaphore

一個計數的訊號量類似JDK的Semaphore。 JDK中Semaphore維護的一組許可(permits),而Cubator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數。第一種方式是有使用者給定的path決定。第二種方式使用SharedCountReader類。 如果不使用SharedCountReader, 沒有內部程式碼檢查程序是否假定有10個租約而程序B假定有20個租約。 所以所有的例項必須使用相同的numberOfLeases值.

這次呼叫acquire會返回一個租約物件。 客戶端必須在finally中close這些租約物件,否則這些租約會丟失掉。 但是, 但是,如果客戶端session由於某種原因比如crash丟掉, 那麼這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續使用這些租約。 租約還可以通過下面的方式返還:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

注意一次你可以請求多個租約,如果Semaphore當前的租約不夠,則請求執行緒會被阻塞。 同時還提供了超時的過載方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

主要類有:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

下面是使用的例子:

package com.colobu.zkrecipe.lock;

import java.util.Collection;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class InterProcessSemaphoreExample {
    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }

}

首先我們先獲得了5個租約, 最後我們把它還給了semaphore。 接著請求了一個租約,因為semaphore還有5個租約,所以請求可以滿足,返回一個租約,還剩4個租約。 然後再請求一個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null。

上面說講的鎖都是公平鎖(fair)。 總ZooKeeper的角度看, 每個客戶端都按照請求的順序獲得鎖。 相當公平。

多鎖物件 Multi Shared Lock

Multi Shared Lock是一個鎖的容器。 當呼叫acquire, 所有的鎖都會被acquire,如果請求失敗,所有的鎖都會被release。 同樣呼叫release時所有的鎖都被release(失敗被忽略)。 基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。

主要涉及兩個類:

  • InterProcessMultiLock
  • InterProcessLock

它的建構函式需要包含的鎖的集合,或者一組ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

例子如下:

package com.colobu.zkrecipe.lock;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class InterProcessMultiLockExample {
    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has the lock");

            System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());

            try {            
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());
        }
    }

}

新建一個InterProcessMultiLock, 包含一個重入鎖和一個非重入鎖。 呼叫acquire後可以看到執行緒同時擁有了這兩個鎖。 呼叫release看到這兩個鎖都被釋放了。

再重申以便, 強烈推薦使用ConnectionStateListener監控連線的狀態。