1. 程式人生 > >實現執行緒同步的幾種方式總結

實現執行緒同步的幾種方式總結

在多執行緒中執行緒的執行順序是依靠哪個執行緒先獲得到CUP的執行權誰就先執行,雖然說可以通過執行緒的優先權進行設定,但是他只是獲取CUP執行權的概率高點,但是也不一定必須先執行。在這種情況下如何保證執行緒按照一定的順序進行執行,今天就來一個大總結,分別介紹一下幾種方式。

  1. 通過Object的wait和notify
  2. 通過Condition的awiat和signal
  3. 通過一個阻塞佇列
  4. 通過兩個阻塞佇列
  5. 通過SynchronousQueue 
  6. 通過執行緒池的Callback回撥
  7. 通過同步輔助類CountDownLatch
  8. 通過同步輔助類CyclicBarrier

一、通過Object的wait和notify

之前寫過一篇文章介紹生產者與消費者模式就是用這個機制實現的,現在來一個簡單的寫法。寫一個測試了Test,加上main方法,在寫一個內部類Man進行測試。main方法如下,他進行建立兩個執行緒,傳進去Runnable物件。

    public static boolean flag = false;

    public static int num = 0;

    public static void main(String[] args) {
        Man man = new Man();

        new Thread(() -> {
            man.getRunnable1();
        }).start();
        new Thread(() -> {
            man.getRunnable2();
        }).start();
    }

getRunnable1和getRunnable2分別表示兩個需要執行的任務,在兩個執行緒中進行,方法1用於資料的生產,方法二用於資料的獲取,資料的初始值為num = 0,為了保證生產和獲取平衡需要使用wait和notify方法,這兩個方法的使用必須是要加鎖的,因此使用synchronized進行加鎖使用,為了演示這個效果,我們加上一個sleep方法模擬處理時間,如下:

    public static class Man {
        
        public synchronized void getRunnable1() {
            for (int i = 0; i < 20; i++) {
                while (flag) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("生產出:" + (++num) + "個");
                flag = true;
                notify();
            }
        }
        
        public synchronized void getRunnable2() {
            for (int i = 0; i < 20; i++) {
                while (!flag) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                //模擬載入時間
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("取出出:" + (num--) + "個");
                System.out.println("------------------");

                flag = false;
                notify();
            }
        }
    }

分析它的載入流程,從方法1進行分析,由於flag的初始條件為false,所以方法1不進入等待,直接進行生產,生產完成成之後,更新flag的值為true,同時notify下一個方法2的wait方法,使其變為喚醒狀態。這時候由於方法1加鎖了,無法執行方法1其他部分,當方法1執行完畢,方法1才有可能執行,但是方法1的flag已經為true,進入到wait裡面又處於阻塞狀態,所以這時候只能執行方法2了。由於方法2被喚醒了,阻塞解除,接下來就獲取資料,當獲取完畢又再次讓flag變為false,notify方法1解除阻塞,再次執行方法1,就這樣不斷的迴圈,保證了不同執行緒的有序執行,直到程式終止。

執行效果如下:

二、通過Condition的awiat和signal

上面第一個的實現是一個阻塞,一個等待的方式保證執行緒有序的執行,但是不能進行兩個執行緒之間進行通訊,而接下來介紹的Condition就具備這樣的功能。要獲取Condition物件首先先得獲取Lock物件,他是在jdk1.5之後增加的,比synchronized效能更好的一種鎖機制。和上面的類似,拷貝一份程式碼,看看main方法:

    public static boolean flag = false;

    public static int num = 0;

    public static void main(String[] args) {
        Man man = new Man();

        new Thread(() -> {
            man.getRunnable1();
        }).start();
        new Thread(() -> {
            man.getRunnable2();
        }).start();
    }

情況和第一個實現方法分析一致,這裡不重複了。主要看內部類Man中的方法1和方法2。先手建立鎖物件,把synchronized改為使用Lock加鎖,其次通過Lock建立Condition物件,替換掉Object類的wait方法為Condition的await方法,最後換掉notify方法為signal方法即可,執行原理和上面分析一致,程式碼如下:

    public static class Man {
        public static ReentrantLock lock = new ReentrantLock();
        public static Condition condition = lock.newCondition();

        public void getRunnable1() {
            lock.lock();
            try {
                for (int i = 0; i < 20; i++) {
                    while (flag) {
                        try {
                            condition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("生產出:" + (++num) + "個");
                    flag = true;
                    condition.signal();
                }
            } finally {
                lock.lock();
            }
        }

        public void getRunnable2() {
            lock.lock();
            try {
                for (int i = 0; i < 20; i++) {
                    while (!flag) {
                        try {
                            condition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("取出出:" + (num--) + "個");
                    System.out.println("------------------");
                    flag = false;
                    condition.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }

執行結果如下:

三、通過一個阻塞佇列

  上面的兩個方法實現起來程式碼比較繁瑣,如果通過阻塞佇列來實現會更加簡潔,這裡採用常用的容量為64的ArrayBlockingQueue來實現。main方法如下:

    public static void main(String[] args) {
        Man man = new Man();

        new Thread(() -> {
            man.getRunnable1();
        }).start();
        new Thread(() -> {
            man.getRunnable2();
        }).start();
    }

主要來看Man中的方法1和方法2,方法1中生產資料,這裡把生產的資料存進佇列裡面,同時方法2進行取資料,如果方法1放滿了或者方法2取完了就會被阻塞住,等待方法1生產好了或者方法2取出了,然後再進行。程式碼如下:

    public static class Man {

        ArrayBlockingQueue queue = new ArrayBlockingQueue<Integer>(64);

        public void getRunnable1() {
            for (int i = 0; i < 8; i++) {
                System.out.println("生產出:" + i + "個");
                try {
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("---------------生產完畢-----------------");
        }

        public void getRunnable2() {
            for (int i = 0; i < 8; i++) {
                try {
                    int num = (int) queue.take();
                    System.out.println("取出出:" + num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

很明顯使用阻塞佇列程式碼精煉了很多,在這還可以發現這個阻塞佇列是具有快取功能的,想很多Android中網路訪問框架內部就是使用這個進行快取的,例如Volley、Okhttp等等。

執行效果如下:

四、通過兩個阻塞佇列

使用一個阻塞佇列能夠實現執行緒同步的功能,兩個阻塞佇列也可以實現執行緒同步。原理是ArrayBlockingQueue他是具有容量的,如果把他的容量定位1則意味著他只能放進去一個元素,第二個方進行就會就會被阻塞。按照這個原理進行來實現,定義兩個容量為1的阻塞佇列ArrayBlockingQueue,一個存放資料,另一個用於控制次序。main方法和上面一致,主要來看看Man類中的兩個方法:

    static class Man {
        //資料的存放
        ArrayBlockingQueue queue1 = new ArrayBlockingQueue<Integer>(1);
        //用於控制程式的執行
        ArrayBlockingQueue queue2 = new ArrayBlockingQueue<Integer>(1);

        {
            try {
                //queue2放進去一個元素,getRunnable2阻塞
                queue2.put(22222);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void getRunnable1() {
            new Thread(() -> {
                for (int j = 0; j < 20; j++) {
                    try {
                        //queue1放進一個元素,getRunnable1阻塞

                        queue1.put(j);
                        System.out.println("存放   執行緒名稱:" + Thread.currentThread().getName() + "-資料為-" + j);

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    try {
                        //queue2取出元素,getRunnable2進入
                        queue2.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        public void getRunnable2() {
            new Thread(() -> {
                for (int j = 0; j < 20; j++) {
                    try {
                        //queue2放進一個元素,getRunnable2阻塞
                        queue2.put(22222);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    try {
                        //queue1放進一個元素,getRunnable1進入

                        int i = (int) queue1.take();
                        System.out.println("獲取   執行緒名稱:" + Thread.currentThread().getName() + "-資料為-" + i);

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

再次提醒queue2用於控制程式的執行次序,並無實際含義。最後看看執行效果,存一個、取一個很清晰,如下:

五、通過SynchronousQueue

SynchronousQueue不同於一般的資料等執行緒,而是執行緒等待資料,他是一個沒有資料緩衝的BlockingQueue,生產者執行緒對其的插入操作put必須等待消費者的移除操作take,反過來也一樣。通過這一特性來實現一個多執行緒同步問題的解決方案,程式碼如下:

    /**
     * 使用阻塞佇列SynchronousQueue
     * offer將資料插入隊尾
     * take取出資料,如果沒有則阻塞,直到有資料在獲取到
     */
    public static void test() {
        SynchronousQueue queue = new SynchronousQueue();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    queue.offer(9);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        try {
            int take = (int) queue.take();
            System.out.println(take);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

子執行緒中進行設定資料,而主執行緒獲取資料,如果子執行緒沒執行完畢,子執行緒沒有執行完畢主執行緒就會被阻塞住不能執行下一步。

六、通過執行緒池的Callback回撥

線上程的建立中,有一種建立方法可以返回執行緒結果,就是callback,他能返回執行緒的執行結果,通過子執行緒返回的結果進而在主執行緒中進行操作,也是一種同步方法,這種同步在Android中特別適用,例如Android中的AsyncTask原始碼中任務的建立部分。程式碼如下:

    private static void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<Boolean> submit = executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return false;
            }
        });
        try {
            if (submit.get()) {
                System.out.println(true);
            } else {
                System.out.println(false);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

7、通過同步輔助類CountDownLatch

CountDownLatch是一個同步的輔助類,允許一個或多個執行緒,等待其他一組執行緒完成操作,再繼續執行。他類實際上是使用計數器的方式去控制的,在建立的時候傳入一個int數值每當我們呼叫countDownt()方法的時候就使得這個變數的值減1,而對於await()方法則去判斷這個int的變數的值是否為0,是則表示所有的操作都已經完成,否則繼續等待。可以理解成倒計時鎖。

public class Test7 {
    public static void main(String[] args) {
        //啟動兩個執行緒,分別執行完畢之後再執行主執行緒
        CountDownLatch countDownLatch = new CountDownLatch(2);

        //執行緒1執行
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "執行緒執行完畢");
            countDownLatch.countDown();
        });
        //執行緒2執行
        Thread thread2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "執行緒執行完畢");
            countDownLatch.countDown();
        });


        thread1.start();
        thread2.start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //執行主執行緒
        System.out.println("主執行緒執行完畢");
    }
}

結果如下:

8、通過同步輔助類CyclicBarrier

CyclicBarrier是一個同步的輔助類,和上面的CountDownLatch比較類似,不同的是他允許一組執行緒相互之間等待,達到一個共同點,再繼續執行。可看成是個障礙,所有的執行緒必須到齊後才能一起通過這個障礙。

public class Test8 {
    public static void main(String[] args) {
        //啟動兩個執行緒,分別執行完畢之後再執行主執行緒
        CyclicBarrier barrier  = new CyclicBarrier(2, () -> {
            //執行主執行緒
            System.out.println("主執行緒執行完畢");

        });

        //執行緒1執行
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "執行緒執行完畢");

            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });

        //執行緒2執行
        Thread thread2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "執行緒執行完畢");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });


        thread1.start();
        thread2.start();
    }
}

執行結果:

至此八大方法介紹完畢!