1. 程式人生 > >多線程——工具類之Semaphore

多線程——工具類之Semaphore

nes sha 阻塞 bool eth 創建 執行 pri 工具類

一、Semaphore功能介紹

Semaphore類相當於線程計數器,在獲取Semaphore對象時設定可以產生的線程總數(線程並不是Semaphore類生成的,它只是統計線程的數量),創建Semaphore類對象如下方法所示:

//創建一個Semaphore對象,Sync sync對象賦值為NonfairSync對象
Semaphore sp = new Semaphore(1);


//創建一個Semaphore對象,Sync sync對象賦值為FairSync對象
Semaphore sp = new Semaphore(1,true);

在創建線程以前調用Semaphore類的acquire()方法來判斷是否還可以創建線程,acquire()方法每調用一次當前可創建的線程總數減一,並且這個方法是一個阻塞式的方法,如果當前線程數量已經達到上限線程會被阻塞,當滿足創建線程的條件時程序就會繼續,在線程運行結束以後調用Semaphore類release()方法來釋放占用的可創建線程的數量。

結論:Semaphore類可以控制並發情況下創建的線程總數

二、Semaphore類方法分解

如下是Semaphore類的構造方法:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

如下是對NonfairSync類和FairSync類的源碼,從代碼看似乎兩個類對tryAcquireShared(int acquires)方法的實現完全不同,其實它們的實現基本相同,NonfairSync類調用的父類的nonfairTryAcquireShared(acquires)方法,此方法的實現如下所示,對比來看區別在於FairSync類在方法入口調用了hasQueuedPredecessors()方法添加了if判斷,hasQueuedPredecessors代碼如下所示。

 /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            
return nonfairTryAcquireShared(acquires); } } /** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

1、acquire()/acquire(int)方法介紹

如下所示,acquire()方法調用的是父類的acquireSharedInterruptibly(int arg)方法,這個方法調用子類的tryAcquireShared(int arg)如果沒有線程數達到上限時則執行doAcquireSharedInterruptibly(arg),如下所示這個方法裏面有一個死循環,當可創建的線程數量滿足參數arg時,跳出死循環,創建線程的代碼繼續。

結論:acquire()是一個阻塞式的方法,從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者當前線程中斷時拋出InterruptedException異常,中斷阻塞。

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2、acquireUninterruptibly()/acquireUninterruptibly()方法介紹

這兩個方法和acquire()的兩個方法基本是一樣的,唯一不同是,這兩個調用的方法acquireShared(int)沒有了當前線程是否中斷的if判斷並且當前這個方法不拋InterruptedException異常,所以在當前線程被中斷時當前阻塞的方法不會中斷。

結論:acquireUninterruptibly是一個阻塞式的方法,從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

三、樣例演示

如下代碼是一個簡單的樣例,運行下面代碼,從打印信息的順序就可以驗證獲取信號量的方法是一個阻塞時的,其它方法的功能驗證,網友自己完成吧!

public class ThreadTest {

    public static void main(String[] args) throws Exception {
        semaphoreTest();
    }

    public static void semaphoreTest() throws InterruptedException {
        final Semaphore semaphore = new Semaphore(1);
        System.out.println("1");
        semaphore.acquire();
        Thread t1 = new Thread() {
            @Override
            public void run() {
                try {
                    sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("釋放");
                semaphore.release();
            }
        };
        t1.start();
        semaphore.acquire();
        System.out.println("2");
    }
}

多線程——工具類之Semaphore