1. 程式人生 > >Java 執行緒之間通訊

Java 執行緒之間通訊

目錄

 

概念

原理

實現

1. 第一種解法,包含多種小的不同實現方式,但一個共同點就是靠一個共享變數來做控制;

a. 利用最基本的synchronized、notify、wait:

b. 利用Lock和Condition:

c. 利用volatile:

d. 利用AtomicInteger: 

2. 第二種解法,是利用CyclicBarrierAPI;

3. 第三種解法,是利用PipedInputStreamAPI;

4. 第四種解法,是利用BlockingQueue;


概念

用多執行緒的目的:更好的利用CPU的資源。因為所有的多執行緒程式碼都可以用單執行緒來實現。

多執行緒:指的是這個程式(一個程序)執行時產生了不止一個執行緒。

並行:多個CPU例項或者多臺機器同時執行一段處理邏輯,是真正的同時。

併發:通過CPU排程演算法,讓使用者看上去同時執行,實際上從CPU操作層面不是真正的同時。

執行緒安全:經常用來描繪一段程式碼。指在併發的情況下,該程式碼經過多執行緒使用,執行緒的排程順序不影響任何結果。這個時候使用多執行緒,我們只需要關注系統的記憶體,CPU是不是夠用即可。

執行緒不安全:執行緒的排程順序會影響最終結果,如不加事務的轉賬程式碼。

同步:Java中的同步指的是通過人為的控制和排程,保證共享資源的多執行緒訪問稱為執行緒安全,來保證結果的準確。在保證結果準確的同時,提高效能,才是優秀的程式。執行緒安全的優先順序高於效能。

 

原理

                          

        Java的併發採用的是共享變數模型,Java執行緒之間的通訊由Java記憶體模型控制,JMM決定一個執行緒對共享變數的寫入何時對另一個執行緒可見。執行緒之間的共享變數儲存在主記憶體中(Main Memory),每一個執行緒都有自己的本地記憶體(Local Memory),本地記憶體中儲存著讀/寫共享變數的副本。從上圖中可以看出,如果執行緒A與執行緒B之間要通訊的話,必須要經歷下面兩個步驟:

1、執行緒A把本地記憶體A中更新過的共享變數重新整理到主記憶體中去。

2、執行緒B到記憶體中去讀取執行緒A之前已更新過的共享變數。

                            

       

        如上圖所示,本地記憶體A和本地記憶體B由主記憶體中共享變數x的副本。假設在最開始時,這3個記憶體中的x值都為0。執行緒A在執行時,把更新後的x值(假設值為1)臨時存放在自己的本地記憶體A中。當執行緒A和執行緒B需要通訊時,執行緒A首先會把自己本地記憶體中修改後的x值重新整理到主記憶體中,此時主記憶體中的x值變為1.隨後,執行緒B到主記憶體中去讀取執行緒A更新後的x值,此時執行緒B的本地記憶體的x值也變為1。

       從整體上看,這就是執行緒A在向執行緒B傳送訊息,而且這個訊息必須經過主記憶體。JMM通過控制主記憶體與每個執行緒的本地記憶體之間的互動,來為Java程式設計師提供記憶體可見性保證。

實現

1. 第一種解法,包含多種小的不同實現方式,但一個共同點就是靠一個共享變數來做控制;

a. 利用最基本的synchronizednotifywait

public class MethodOne {
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                try {
                    for (int i = 0; i < arr.length; i=i+2) {
                        synchronized (threadToGo) {
                            while (threadToGo.value == 2)
                                threadToGo.wait();
                            Helper.print(arr[i], arr[i + 1]);
                            threadToGo.value = 2;
                            threadToGo.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Oops...");
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                try {
                    for (int i = 0; i < arr.length; i++) {
                        synchronized (threadToGo) {
                            while (threadToGo.value == 1)
                                threadToGo.wait();
                            Helper.print(arr[i]);
                            threadToGo.value = 1;
                            threadToGo.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Oops...");
                }
            }
        };
    }
    class ThreadToGo {
        int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
        MethodOne one = new MethodOne();
        Helper.instance.run(one.newThreadOne());
        Helper.instance.run(one.newThreadTwo());
        Helper.instance.shutdown();
    }
}

b. 利用LockCondition

public class MethodTwo {
    private Lock lock = new ReentrantLock(true);
    private Condition condition = lock.newCondition();
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    try {
                        lock.lock();
                        while(threadToGo.value == 2)
                            condition.await();
                        Helper.print(arr[i], arr[i + 1]);
                        threadToGo.value = 2;
                        condition.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    try {
                        lock.lock();
                        while(threadToGo.value == 1)
                            condition.await();
                        Helper.print(arr[i]);
                        threadToGo.value = 1;
                        condition.signal();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
    }
    class ThreadToGo {
        int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
        MethodTwo two = new MethodTwo();
        Helper.instance.run(two.newThreadOne());
        Helper.instance.run(two.newThreadTwo());
        Helper.instance.shutdown();
    }
}

c. 利用volatile:

volatile修飾的變數值直接存在main memory裡面,子執行緒對該變數的讀寫直接寫入main memory,而不是像其它變數一樣在local thread裡面產生一份copy。volatile能保證所修飾的變數對於多個執行緒可見性,即只要被修改,其它執行緒讀到的一定是最新的值。 

public class MethodThree {
    private volatile ThreadToGo threadToGo = new ThreadToGo();
    class ThreadToGo {
        int value = 1;
    }
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    while(threadToGo.value==2){}
                    Helper.print(arr[i], arr[i + 1]);
                    threadToGo.value=2;
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    while(threadToGo.value==1){}
                    Helper.print(arr[i]);
                    threadToGo.value=1;
                }
            }
        };
    }
    public static void main(String args[]) throws InterruptedException {
        MethodThree three = new MethodThree();
        Helper.instance.run(three.newThreadOne());
        Helper.instance.run(three.newThreadTwo());
        Helper.instance.shutdown();
    }
}

d. 利用AtomicInteger: 

public class MethodFive {
    private AtomicInteger threadToGo = new AtomicInteger(1);
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    while(threadToGo.get()==2){}
                    Helper.print(arr[i], arr[i + 1]);
                    threadToGo.set(2);
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    while(threadToGo.get()==1){}
                    Helper.print(arr[i]);
                    threadToGo.set(1);
                }
            }
        };
    }
    public static void main(String args[]) throws InterruptedException {
        MethodFive five = new MethodFive();
        Helper.instance.run(five.newThreadOne());
        Helper.instance.run(five.newThreadTwo());
        Helper.instance.shutdown();
    }
}

2. 第二種解法,是利用CyclicBarrierAPI;

CyclicBarrier可以實現讓一組執行緒在全部到達Barrier時(執行await()),再一起同時執行,並且所有執行緒釋放後,還能複用它,即為Cyclic。
CyclicBarrier類提供兩個構造器:

 

public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}
public class MethodFour{
      private final CyclicBarrier barrier;
      private final List<String> list;
      public MethodFour() {
          list = Collections.synchronizedList(new ArrayList<String>());
          barrier = new CyclicBarrier(2,newBarrierAction());
      }
      public Runnable newThreadOne() {
          final String[] inputArr = Helper.buildNoArr(52);
          return new Runnable() {
              private String[] arr = inputArr;
              public void run() {
                  for (int i = 0, j=0; i < arr.length; i=i+2,j++) {
                      try {
                          list.add(arr[i]);
                          list.add(arr[i+1]);
                          barrier.await();
                      } catch (InterruptedException | BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                  }
              }
          };
      }
      public Runnable newThreadTwo() {
          final String[] inputArr = Helper.buildCharArr(26);
          return new Runnable() {
              private String[] arr = inputArr;
              public void run() {
                  for (int i = 0; i < arr.length; i++) {
                      try {
                          list.add(arr[i]);
                          barrier.await();
                      } catch (InterruptedException | BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                  }
              }
          };
      }
      private Runnable newBarrierAction(){
          return new Runnable() {
              @Override
              public void run() {
                  Collections.sort(list);
                  list.forEach(c->System.out.print(c));
                  list.clear();
              }
          };
      }
      public static void main(String args[]){
          MethodFour four = new MethodFour();
          Helper.instance.run(four.newThreadOne());
          Helper.instance.run(four.newThreadTwo());
          Helper.instance.shutdown();
      }
}

 這裡多說一點,這個API其實還是利用lockcondition,無非是多個執行緒去爭搶CyclicBarrier的instance的lock罷了,最終barrierAction執行時,是在搶到CyclicBarrierinstance的那個執行緒上執行的。

 

3. 第三種解法,是利用PipedInputStreamAPI;

這裡用流在兩個執行緒間通訊,但是Java中的Stream是單向的,所以在兩個執行緒中分別建了一個input和output。這顯然是一種很搓的方式,不過也算是一種通訊方式吧……-_-T,執行的時候那種速度簡直。。。請不要BS我。

public class MethodSix {
    private final PipedInputStream inputStream1;
    private final PipedOutputStream outputStream1;
    private final PipedInputStream inputStream2;
    private final PipedOutputStream outputStream2;
    private final byte[] MSG;
    public MethodSix() {
        inputStream1 = new PipedInputStream();
        outputStream1 = new PipedOutputStream();
        inputStream2 = new PipedInputStream();
        outputStream2 = new PipedOutputStream();
        MSG = "Go".getBytes();
        try {
            inputStream1.connect(outputStream2);
            inputStream2.connect(outputStream1);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void shutdown() throws IOException {
        inputStream1.close();
        inputStream2.close();
        outputStream1.close();
        outputStream2.close();
    }
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            private PipedInputStream in = inputStream1;
            private PipedOutputStream out = outputStream1;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    Helper.print(arr[i], arr[i + 1]);
                    try {
                        out.write(MSG);
                        byte[] inArr = new byte[2];
                        in.read(inArr);
                        while(true){
                            if("Go".equals(new String(inArr)))
                                break;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            private PipedInputStream in = inputStream2;
            private PipedOutputStream out = outputStream2;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    try {
                        byte[] inArr = new byte[2];
                        in.read(inArr);
                        while(true){
                            if("Go".equals(new String(inArr)))
                                break;
                        }
                        Helper.print(arr[i]);
                        out.write(MSG);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }
    public static void main(String args[]) throws IOException {
        MethodSix six = new MethodSix();
        Helper.instance.run(six.newThreadOne());
        Helper.instance.run(six.newThreadTwo());
        Helper.instance.shutdown();
        six.shutdown();
    }

4. 第四種解法,是利用BlockingQueue

 
BlockingQueue定義的常用方法如下:

  • add(Object):把Object加到BlockingQueue裡,如果BlockingQueue可以容納,則返回true,否則丟擲異常。
  • offer(Object):表示如果可能的話,將Object加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false。
  • put(Object):把Object加到BlockingQueue裡,如果BlockingQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡有空間再繼續。
  • poll(time):獲取並刪除BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null。當不傳入time值時,立刻返回。
  • peek():立刻獲取BlockingQueue裡排在首位的物件,但不從佇列裡刪除,如果佇列為空,則返回null。
  • take():獲取並刪除BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的物件被加入為止。

BlockingQueue有四個具體的實現類:

  • ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小。其所含的物件是以FIFO(先入先出)順序排序的。
  • LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的物件是以FIFO順序排序的。
  • PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式所帶的Comparator決定的順序。
  • SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。

這裡我用了兩種玩法:

  • 一種是共享一個queue,根據peekpoll的不同來實現;
  • 第二種是兩個queue,利用take()會自動阻塞來實現。
public class MethodSeven {
    private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    Helper.print(arr[i], arr[i + 1]);
                    queue.offer("TwoToGo");
                    while(!"OneToGo".equals(queue.peek())){}
                    queue.poll();
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    while(!"TwoToGo".equals(queue.peek())){}
                    queue.poll();
                    Helper.print(arr[i]);
                    queue.offer("OneToGo");
                }
            }
        };
    }
    private final LinkedBlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<String> queue2 = new LinkedBlockingQueue<>();
    public Runnable newThreadThree() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    Helper.print(arr[i], arr[i + 1]);
                    try {
                        queue2.put("TwoToGo");
                        queue1.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }
    public Runnable newThreadFour() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    try {
                        queue2.take();
                        Helper.print(arr[i]);
                        queue1.put("OneToGo");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }
    public static void main(String args[]) throws InterruptedException {
        MethodSeven seven = new MethodSeven();
        Helper.instance.run(seven.newThreadOne());
        Helper.instance.run(seven.newThreadTwo());
        Thread.sleep(2000);
        System.out.println("");
        Helper.instance.run(seven.newThreadThree());
        Helper.instance.run(seven.newThreadFour());
        Helper.instance.shutdown();
    }

參考:

https://blog.csdn.net/zlts000/article/details/52107426

https://blog.csdn.net/u011514810/article/details/77131296