1. 程式人生 > >java多執行緒,多執行緒加鎖以及Condition類的使用

java多執行緒,多執行緒加鎖以及Condition類的使用

看了網上非常多的執行程式碼,很多都是重複的再說一件事,可能對於java老鳥來說,理解java的多執行緒是非常容易的事情,但是對於我這樣的菜鳥來說,這個實在有點難,可能是我太菜了,網上重複的陳述對於我理解這個問題一點幫助都沒有.所以這裡我寫下我對於這個問題的理解,目的是為了防止我忘記.

還是從程式碼例項開始講起:

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue<Integer> queue = new MyBlockingQueue<>(1);
        for (int i = 0; i < 10; i++) {
            int data = i;
            new Thread(() -> {
                try {
                    queue.enqueue(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        System.out.println("1111111");
        for(int i=0;i<10;i++){
            new Thread(() -> {
                try {
                    queue.dequeue();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }).start();
        }
    }
    public static class MyBlockingQueue<E> {
        int size;//阻塞佇列最大容量
        ReentrantLock lock = new ReentrantLock(true);
        LinkedList<E> list=new LinkedList<>();//佇列底層實現
        Condition notFull = lock.newCondition();//佇列滿時的等待條件
        Condition notEmpty = lock.newCondition();//佇列空時的等待條件
        public MyBlockingQueue(int size) {
            this.size = size;
        }
        public void enqueue(E e) throws InterruptedException {
            lock.lock();
            try {
                while(list.size() ==size)//佇列已滿,在notFull條件上等待
                    notFull.await();

                list.add(e);//入隊:加入連結串列末尾
                System.out.println("入隊:" +e);
                notEmpty.signal(); //通知在notEmpty條件上等待的執行緒
            } finally {
                lock.unlock();
            }
        }
        public E dequeue() throws InterruptedException {
            E e;
            lock.lock();
            try {
                while(list.size() == 0)
                    notEmpty.await();
                e = list.removeFirst();//出隊:移除連結串列首元素
                System.out.println("出隊:"+e);
                notFull.signal();//通知在notFull條件上等待的執行緒
                return e;
            } finally {
                lock.unlock();
            }
        }
    }
}
程式碼來自菜鳥教程

主函式啟動了20個執行緒,前10個是入隊的後10個是出隊的,我們可以看啊可能輸出結果,

new Thread(() -> {
try {
queue.enqueue(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start(); 注意到執行緒實現,這個是lambda表示式實現Runable介面.
入隊:0
1111111
出隊:0
入隊:2
出隊:2
入隊:1
出隊:1
入隊:3
出隊:3
入隊:4
出隊:4
入隊:5
出隊:5
入隊:6
出隊:6
入隊:7
出隊:7
入隊:8
出隊:8
入隊:9
出隊:9

可以看到1111111在第一個出隊之前,佇列容量為1,也就是說頭10個入隊程序只有第一個成功了,其他均被阻塞.

並且出隊入隊順序是按照迴圈順序的,說明鎖是按照請求順序來獲取的,先到先得,這個說的就是公平鎖的意思,其實ReentrantLock既可以是公平鎖也可以是非公平鎖,其初始化的時候,往建構函式裡面傳入true則為公平鎖,false則為非公平鎖.

至於什麼是可重入鎖,可以看看這篇部落格https://blog.csdn.net/qq_38737992/article/details/90613821,這個是可重入鎖的例項.

其中有一行程式碼很奇怪,lock.lock();對於我這樣的萌新很好奇這個一行程式碼到底發生了什麼,網上很多都是說獲得鎖,但是"獲得"這個實在難以太不具體,所以我自己想象了一下,感覺大致上就是這樣的一張圖:

 

結合我粗淺的經驗猜測:jvm只有一個就緒佇列,就緒佇列裡面的執行緒按照佇列順序使用cpu資源,若不加鎖,那麼所有執行緒都可以按序取得資源,但是由於面向物件了,所以不好直接控制就緒佇列裡面執行緒的入隊順序,這個時候就需要加鎖來控制執行緒的執行順序來保證處理邏輯正確.(其實也不不是那麼嚴格的佇列,就緒狀態的執行緒如果是非公平鎖一般會隨機先後的執行,說是佇列而已,其實就是表達就緒狀態)

結合程式碼的lock()方法,如果有執行緒進入cpu並且呼叫lock(),如果該鎖沒有被其他執行緒獲取過,那麼這個執行緒可以使用cpu時間,如果該鎖已經被其他執行緒獲取了,那麼該執行緒會給阻塞,進入阻塞佇列, 這樣來說的話,其實"獲取"這個詞也沒什麼難以理解的,其實就是一個標記而已,然後lock()方法其實就只是判斷當前執行緒是使用cpu時間,還是進入阻塞佇列而已..

在看看unlock()方法,由上面的圖幫助,其實這個也很好理解,其實就是把阻塞佇列的隊首的執行緒出隊,然後進入就緒佇列而已.

可以猜測,如果執行過程中有多個鎖例項,那麼就會有多少個可能阻塞的執行緒,那麼除了使用用多個鎖,其實還有別的方法來增加阻塞執行緒,就是使用Condition類,需要指出的是condition類的await()方法,會阻塞當前執行緒,然後自動解除當前執行緒獲取的鎖(這點尤其重要),切換執行緒,如果其他執行緒中有喚醒,那麼這個在被喚醒後執行緒會從await()的位置繼續往下執行,所以一般要配合while迴圈使用,如果某執行緒被喚醒,那麼它對於它之前獲取的鎖,也將重新獲取,如果此時該鎖已經被另外一個執行緒獲取,且還沒有解鎖,此時的喚醒就會出錯,會出現莫名其妙的錯誤,所以需要設定一個volatile變數來檢測執行緒的執行狀態,所以await()方法前後都要檢測.

這裡提出一道題,來自leetcode,要求使用condition類來寫,

題意:

編寫一個可以從 1 到 n 輸出代表這個數字的字串的程式,但是:

如果這個數字可以被 3 整除,輸出 "fizz"。
如果這個數字可以被 5 整除,輸出 "buzz"。
如果這個數字可以同時被 3 和 5 整除,輸出 "fizzbuzz"。
例如,當 n = 15,輸出: 1, 2, fizz, 4, buzz, fizz, 7, 8, fizz, buzz, 11, fizz, 13, 14, fizzbuzz。

來源:力扣(LeetCode)
連結:https://leetcode-cn.com/problems/fizz-buzz-multithreaded

 解法:

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    static void printFizz(int x){
        System.out.printf("%d:Fizz,\n",x);
    }
    static void printBuzz(int x){
        System.out.printf("%d:Buzz,\n",x);
    }
    static void printFizzBuzz(int x){
        System.out.printf("%d:FizzBuzz,\n",x);
    }
    static void printaccpt(int x){
        System.out.printf("%d,\n",x);
    }
    static volatile int now=1;
    static ReentrantLock lock=new ReentrantLock();
    static Condition k1=lock.newCondition();
    public static void test(int n) {

        new Thread(()->{
            while(now<=n){
                lock.lock();
                try{
                    while(now%5==0||now%3!=0){
                        if(now>n) throw new InterruptedException();
                        k1.await();
                        if(now>n) throw new InterruptedException();
                    }
                    printFizz(now);
                    now++;
                    k1.signalAll();
                } catch (InterruptedException e) {
                    break;
                    //e.printStackTrace();
                } finally{
                    lock.unlock();
                }
            }
            System.out.println("Thread 1 is over");
        }).start();

        new Thread(()->{
            while(now<=n){
                lock.lock();
                try{

                    while(now%5!=0||now%3==0) {
                        if(now>n) throw new InterruptedException();
                        k1.await();
                        if(now>n) throw new InterruptedException();
                    }
                    printBuzz(now);
                    now++;
                    k1.signalAll();
                } catch (InterruptedException e) {
                    break;
                   // e.printStackTrace();
                } finally{
                    lock.unlock();
                }
            }
            System.out.println("Thread 2 is over");
        }).start();


        new Thread(()->{
            while(now<=n){
                lock.lock();
                try{
                    while(now%5!=0||now%3!=0) {
                        if(now>n) throw new InterruptedException();
                        k1.await();
                        if(now>n) throw new InterruptedException();
                    }
                    printFizzBuzz(now);
                    now++;
                    k1.signalAll();
                } catch (InterruptedException e) {
                    break;
                    //Thread.interrupted();
                    //e.printStackTrace();
                } finally{
                    lock.unlock();
                }
            }
            System.out.println("Thread 3 is over");
        }).start();


        new Thread(()->{
            while(now<=n){
                lock.lock();
                try{
                    while(now%5==0||now%3==0) {
                        if(now>n) throw new InterruptedException();
                        k1.await();
                        if(now>n) throw new InterruptedException();
                    }
                    printaccpt(now);
                    now++;
                    k1.signalAll();
                }catch (InterruptedException e){
                    break;
                    //Thread.interrupted();
                    //e.printStackTrace();
                }
                finally{
                    lock.unlock();
                }
            }
            System.out.println("Thread 4 is over");
        }).start();
    }

    public static void main(String[] args) throws InterruptedException {
        test(30);
    }
}
View Co