1. 程式人生 > >LockSupport實現執行緒掛起和喚醒——深入淺出原碼分析

LockSupport實現執行緒掛起和喚醒——深入淺出原碼分析

面試題

(1)LockSupport比Object的wait/notify有兩大優勢,分別是什麼?

(2)LockSupport原始碼是如何實現的,具體說說你的看法?

(1)LockSupport比Object的wait/notify有兩大優勢,分別是什麼?

LockSupport是Java6引入的一個工具類,它簡單靈活,應用廣泛。

1.簡單

俗話說,沒有比較就沒有傷害。這裡咱們還是通過對比來介紹LockSupport的簡單。

在沒有LockSupport之前,執行緒的掛起和喚醒咱們都是通過Object的wait和notify/notifyAll方法實現。

寫一段例子程式碼,執行緒A執行一段業務邏輯後呼叫wait阻塞住自己。主執行緒呼叫notify方法喚醒執行緒A,執行緒A然後列印自己執行的結果。

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                try {
                    obj.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒鐘,保證執行緒A已經計算完成,阻塞在wait方法
        Thread.sleep(1000);
        obj.notify();
    }
}

執行這段程式碼,不難發現這個錯誤

java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at com.aaa.TestObjWait$1.run(TestObjWait.java:15)
    at java.lang.Thread.run(Thread.java:748)
45
Exception in thread "main" java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)
    at com.aaa.TestObjWait.main(TestObjWait.java:25)

原因很簡單,wait和notify/notifyAll方法只能在同步程式碼塊裡用。所以將程式碼修改如下就可以正常執行

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                try {
                    synchronized (obj){
                        obj.wait();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒鐘,保證執行緒A已經計算完成,阻塞在wait方法
        Thread.sleep(1000);
        synchronized (obj){
            obj.notify();
        }
    }
}

那如果咱們換成LockSupport呢?簡單得很,看程式碼:

public class TestLo {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) { //——步驟二
                sum += i;
            }
            //——這裡會阻塞
            LockSupport.park(); //——步驟四
            System.out.println(sum); //——步驟五
        });
        thread.start(); //——步驟一
        Thread.sleep(1000);
        LockSupport.unpark(thread); //——步驟三
    }
}

直接呼叫即可,沒有說非得在同步程式碼塊裡才能用,非常easy

2.靈活性

如果只是LockSupport在使用起來比Object的wait/notify簡單,那還真沒必要專門講解下LockSupport。最主要的是靈活性。

上邊的例子程式碼中,主執行緒呼叫了Thread.sleep(1000)方法來等待執行緒A計算完成進入wait狀態。如果去掉Thread.sleep()呼叫,程式碼如下:

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                try {
                    synchronized (obj){
                        obj.wait();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒鐘,保證執行緒A已經計算完成,阻塞在wait方法
        //Thread.sleep(1000);
        synchronized (obj){
            obj.notify();
        }
    }
}

多執行幾次上面的程式碼,有時候能夠正常列印結果並退出程式,但有時候執行緒無法列印結果阻塞了。

原因在於:主執行緒呼叫完notify後,執行緒A才進入wait方法,導致執行緒A一直阻塞了。由於執行緒A不是後臺執行緒,所以整個程式無法退出。

那如果換成LockSupport呢?

LockSupport就支援主執行緒先呼叫unpark後,執行緒A再呼叫parl而不被阻塞嗎?是的,沒錯,程式碼如下

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                LockSupport.park();
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒鐘,保證執行緒A已經計算完成,阻塞在wait方法
        //Thread.sleep(1000);
        LockSupport.unpark(A);
    }
}

不管你執行多少次,這段程式碼都能正常列印結果並退出。這就是LockSupport最大的靈活所在。

總結一下,LockSupport比Object的wait/notify有兩大優勢

①LockSupport不需要在同步程式碼塊裡。所以執行緒間也不需要維護一個共享的同步物件了,實現了執行緒間的解耦。

②unpark函式可以優先於park呼叫,所以不需要擔心執行緒間的執行先後順序。

應用廣泛

LockSupport在Java的工具類用應用很廣泛,咱們這裡找幾個例子感受感受。以Java裡最常用的類ThreadPoolExecutor為例。先看如下程式碼:

public class Test001 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor
                (5, 5, 1000, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(1000));
        Future<String> future = poolExecutor.submit(new Callable<String>() {
            @Override
            public String call() throws InterruptedException {
                TimeUnit.SECONDS.sleep(5);
                return "hello";
            }
        });
        String result = future.get();
        System.out.println(result);
    }
}程式碼中,我們向執行緒池中

程式碼中我們向執行緒池中扔了一個任務,然後呼叫Future的get方法,同步阻塞等待執行緒池的執行結果。

這裡就要問了:get方法是如何組塞住當前執行緒?執行緒池執行完任務後又是如何喚醒執行緒的呢?

咱們跟著原始碼一步步分析,先看執行緒池的submit方法的實現:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

在submit方法裡,執行緒池將我們提交的基於Callable實現的任務,封裝為基於RunnableFuture實現的任務,然後將任務提交到執行緒池執行,並向當前執行緒返回RunnableFutrue。

進入newTaskFor方法,就一句話:return new FutureTask<T>(callable);

所以,咱們主執行緒呼叫future的get方法就是FutureTask的get方法,執行緒池執行的任務物件也是FutureTask的例項。

接下來看看FutureTask的get方法的實現:

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

比較簡單,就是判斷下當前任務是否執行完畢,如果執行完畢直接返回任務結果,否則進入awaitDone方法阻塞等待。

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); //——使用CAS非阻塞演算法,將執行緒封裝為WaitNode,儲存下來,以供後續喚醒執行緒時使用
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);//——呼叫了LockSupport的park/parkNanos阻塞住當前執行緒。
        }
        else
            LockSupport.park(this);
    }
}

上邊已經說了阻塞等待任務結果的邏輯,接下來再看看執行緒池執行完任務,喚醒等待執行緒的邏輯實現。

前邊說了,咱們提交的基於Callable實現的任務,已經被封裝為FutureTask任務提交給了執行緒池執行,任務的執行就是FutureTask的run方法執行。如下是FutureTask的run方法:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); //——執行我們提交的任務
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result); //任務執行完後呼叫set方法,喚醒工作執行緒的工作應該就是在這裡了
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion(); //——進入這裡看如何實現的
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //——先通過CAS非阻塞演算法將所有等待的執行緒拿出來
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); //——然後再使用LockSupport的unpark喚醒每個執行緒
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

在使用執行緒池的過程中,不知道你有沒有這麼一個疑問:執行緒池裡沒有任務時,執行緒池裡的執行緒在幹嘛呢?

通過這篇文章《執行緒池的工作原理與原始碼解讀》的讀者一定知道,執行緒會呼叫佇列的take方法阻塞等待新任務。那佇列的take方法是不是也跟Future的get方法實現一樣呢?咱們來看看原始碼實現。

以ArrayBlockingQueue為例,take方法實現如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); //——使用Lock的Condition的await方法實現執行緒阻塞,進入裡面看看
        return dequeue();
    } finally {
        lock.unlock();
    }
}
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); //——發現await方法裡還是使用LockSupport.park方法阻塞自己
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
public static void park(Object blocker) {
    Thread t = Thread.currentThread();//——獲取呼叫執行緒
    setBlocker(t, blocker); //——設定該執行緒的blocker變數
    UNSAFE.park(false, 0L); //——掛起執行緒
    setBlocker(t, null); //——執行緒被啟用後,清除blocker變數,因為一般都是線上程阻塞時才分析原因
}

Thread類裡面有個變數volatile Object parkBlocker,用來存放park方法傳遞的block物件,也就是把block變數存放到了呼叫park方法的執行緒的成員變數裡面。

(2)LockSupport原始碼是如何實現的,具體說說你的看法?

學習要知其然,還要知其所以然。接下來不妨看看LockSupport的實現。

進入LockSupport的park方法,可以發現它是呼叫了Unsafe的park方法,這是一個本地native方法,只能通過openjdk的原始碼看看其本地實現了。 

它呼叫了執行緒的Parker型別物件的park方法,如下是Parker類的定義:

類中定義了一個int型別的_counter變數,咱們上文中講靈活性的那一節說,可以先執行unpark後執行park,

就是通過這個變數實現,看park方法的實現程式碼(由於方法比較長就不整體截圖了):

park方法會呼叫Atomic::xchg方法,這個方法會原子性的將_counter賦值為0,並返回賦值前的值。如果呼叫park方法前,

_counter大於0,則說明之前呼叫過unpark方法,所以park方法直接返回。

接著往下看:

實際上Parker類用Posix的mutex,condition來實現的阻塞喚醒。如果對mutex和condition不熟,可以簡單理解

為mutex就是Java裡的synchronized,condition就是Object裡的wait/notify操作。

park方法裡呼叫pthread_mutex_trylock方法,就相當於Java執行緒進入Java的同步程式碼塊,然後再次判斷_counter

是否大於零,如果大於零則將_counter設定為零。最後呼叫pthread_mutex_unlock解鎖,相當於Java執行完退出

同步程式碼塊。如果_counter不大於零,則繼續往下執行pthread_cond_wait方法,實現當前執行緒的阻塞。

最後再看看unpark方法的實現吧,這塊就簡單多了,直接上程式碼:

圖中的1和4就相當於Java的進入synchronized和退出synchronized的加鎖解鎖操作,程式碼2將_counter設定為1,

同時判斷先前_counter的值是否小於1,即這段程式碼:if(s<1)。如果不小於1,則就不會有執行緒被park,所以方法

直接執行完畢,否則就會執行程式碼3,來喚醒被阻塞的執行緒。

總結

通過閱讀LockSupport的本地實現,我們不難發現這麼個問題:多次呼叫unpark方法和呼叫一次unpark方法效果一樣,

因為都是直接將_counter賦值為1,而不是加1。簡單說就是:執行緒A連續呼叫兩次LockSupport.unpark(B)方法喚醒執行緒B,

然後執行緒B呼叫兩次LockSupport.park()方法, 執行緒B依舊會被阻塞。因為兩次unpark呼叫效果跟一次呼叫一樣,

只能讓執行緒B的第一次呼叫park方法不被阻塞,第二次呼叫依舊會阻塞。

參考書籍

Java併發程式設計之美

參考連結

https://www.cnblogs.com/qingquanzi/p/8228422.ht