1. 程式人生 > >JAVA執行緒間通訊的幾種方式

JAVA執行緒間通訊的幾種方式

先抄錄下來慢慢看吧
今天在群裡面看到一個很有意思的面試題:
“編寫兩個執行緒,一個執行緒列印125,另一個執行緒列印字母AZ,列印順序為12A34B56C……5152Z,要求使用執行緒間的通訊。”
這是一道非常好的面試題,非常能彰顯被面者關於多執行緒的功力,一下子就勾起了我的興趣。這裡拋磚引玉,給出7種想到的解法。

通用程式碼:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

  • Created by Edison Xu on 2017/3/2.
    */
    public enum Helper {

    instance;

    private static final ExecutorService tPool = Executors.newFixedThreadPool(2);

    public static String[] buildNoArr(int max) {
    String[] noArr = new String[max];
    for(int i=0;i<max;i++){
    noArr[i] = Integer.toString(i+1);
    }
    return noArr;
    }

    public static String[] buildCharArr(int max) {
    String[] charArr = new String[max];
    int tmp = 65;
    for(int i=0;i<max;i++){
    charArr[i] = String.valueOf((char)(tmp+i));
    }
    return charArr;
    }

    public static void print(String… input){
    if(input==null)
    return;
    for(String each:input){
    System.out.print(each);
    }
    }

    public void run(Runnable r){
    tPool.submit®;
    }

    public void shutdown(){
    tPool.shutdown();
    }

}

  1. 第一種解法,包含多種小的不同實現方式,但一個共同點就是靠一個共享變數來做控制;
    a. 利用最基本的synchronized、notify、wait:
    public class MethodOne {
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
    final String[] inputArr = Helper.buildNoArr(52);
    return new Runnable() {
    private String[] arr = inputArr;
    public void run() {
    try {
    for (int i = 0; i < arr.length; i=i+2) {
    synchronized (threadToGo) {
    while (threadToGo.value == 2)
    threadToGo.wait();
    Helper.print(arr[i], arr[i + 1]);
    threadToGo.value = 2;
    threadToGo.notify();
    }
    }
    } catch (InterruptedException e) {
    System.out.println(“Oops…”);
    }
    }
    };
    }
    public Runnable newThreadTwo() {
    final String[] inputArr = Helper.buildCharArr(26);
    return new Runnable() {
    private String[] arr = inputArr;
    public void run() {
    try {
    for (int i = 0; i < arr.length; i++) {
    synchronized (threadToGo) {
    while (threadToGo.value == 1)
    threadToGo.wait();
    Helper.print(arr[i]);
    threadToGo.value = 1;
    threadToGo.notify();
    }
    }
    } catch (InterruptedException e) {
    System.out.println(“Oops…”);
    }
    }
    };
    }
    class ThreadToGo {
    int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
    MethodOne one = new MethodOne();
    Helper.instance.run(one.newThreadOne());
    Helper.instance.run(one.newThreadTwo());
    Helper.instance.shutdown();
    }
    }
    b. 利用Lock和Condition:
    public class MethodTwo {
    private Lock lock = new ReentrantLock(true);
    private Condition condition = lock.newCondition();
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
    final String[] inputArr = Helper.buildNoArr(52);
    return new Runnable() {
    private String[] arr = inputArr;
    public void run() {
    for (int i = 0; i < arr.length; i=i+2) {
    try {
    lock.lock();
    while(threadToGo.value == 2)
    condition.await();
    Helper.print(arr[i], arr[i + 1]);
    threadToGo.value = 2;
    condition.signal();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
    }
    }
    };
    }
    public Runnable newThreadTwo() {
    final String[] inputArr = Helper.buildCharArr(26);
    return new Runnable() {
    private String[] arr = inputArr;
    public void run() {
    for (int i = 0; i < arr.length; i++) {
    try {
    lock.lock();
    while(threadToGo.value == 1)
    condition.await();
    Helper.print(arr[i]);
    threadToGo.value = 1;
    condition.signal();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
    }
    }
    };
    }
    class ThreadToGo {
    int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
    MethodTwo two = new MethodTwo();
    Helper.instance.run(two.newThreadOne());
    Helper.instance.run(two.newThreadTwo());
    Helper.instance.shutdown();
    }
    }
    c. 利用volatile:
    volatile修飾的變數值直接存在main memory裡面,子執行緒對該變數的讀寫直接寫入main memory,而不是像其它變數一樣在local thread裡面產生一份copy。volatile能保證所修飾的變數對於多個執行緒可見性,即只要被修改,其它執行緒讀到的一定是最新的值。
    public class MethodThree {
    private volatile ThreadToGo threadToGo = new ThreadToGo();
    class ThreadToGo {
    int value = 1;
    }
    public Runnable newThreadOne() {
    final String[] inputArr = Helper.buildNoArr(52);
    return new Runnable() {
    private String[] arr = inputArr;
    public void run() {
    for (int i = 0; i < arr.length; i=i+2) {
    while(threadToGo.value2){}
    Helper.print(arr[i], arr[i + 1]);
    threadToGo.value=2;
    }
    }
    };
    }
    public Runnable newThreadTwo() {
    final String[] inputArr = Helper.buildCharArr(26);
    return new Runnable() {
    private String[] arr = inputArr;
    public void run() {
    for (int i = 0; i < arr.length; i++) {
    while(threadToGo.value
    1){}
    Helper.print(arr[i]);
    threadToGo.value=1;
    }
    }
    };
    }
    public static void main(String args[]) throws InterruptedException {
    MethodThree three = new MethodThree();
    Helper.instance.run(three.newThreadOne());
    Helper.instance.run(three.newThreadTwo());
    Helper.instance.shutdown();
    }
    }

d. 利用AtomicInteger:
public class MethodFive {
private AtomicInteger threadToGo = new AtomicInteger(1);
public Runnable newThreadOne() {
final String[] inputArr = Helper.buildNoArr(52);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i=i+2) {
while(threadToGo.get()==2){}
Helper.print(arr[i], arr[i + 1]);
threadToGo.set(2);
}
}
};
}
public Runnable newThreadTwo() {
final String[] inputArr = Helper.buildCharArr(26);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i++) {
while(threadToGo.get()==1){}
Helper.print(arr[i]);
threadToGo.set(1);
}
}
};
}
public static void main(String args[]) throws InterruptedException {
MethodFive five = new MethodFive();
Helper.instance.run(five.newThreadOne());
Helper.instance.run(five.newThreadTwo());
Helper.instance.shutdown();
}
}

  1. 第二種解法,是利用CyclicBarrierAPI;
    CyclicBarrier可以實現讓一組執行緒在全部到達Barrier時(執行await()),再一起同時執行,並且所有執行緒釋放後,還能複用它,即為Cyclic。
    CyclicBarrier類提供兩個構造器:

public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}
public class MethodFour{
private final CyclicBarrier barrier;
private final List list;
public MethodFour() {
list = Collections.synchronizedList(new ArrayList());
barrier = new CyclicBarrier(2,newBarrierAction());
}
public Runnable newThreadOne() {
final String[] inputArr = Helper.buildNoArr(52);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0, j=0; i < arr.length; i=i+2,j++) {
try {
list.add(arr[i]);
list.add(arr[i+1]);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
};
}
public Runnable newThreadTwo() {
final String[] inputArr = Helper.buildCharArr(26);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i++) {
try {
list.add(arr[i]);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
};
}
private Runnable newBarrierAction(){
return new Runnable() {
@Override
public void run() {
Collections.sort(list);
list.forEach(c->System.out.print©);
list.clear();
}
};
}
public static void main(String args[]){
MethodFour four = new MethodFour();
Helper.instance.run(four.newThreadOne());
Helper.instance.run(four.newThreadTwo());
Helper.instance.shutdown();
}
}
這裡多說一點,這個API其實還是利用lock和condition,無非是多個執行緒去爭搶CyclicBarrier的instance的lock罷了,最終barrierAction執行時,是在搶到CyclicBarrierinstance的那個執行緒上執行的。
3. 第三種解法,是利用PipedInputStreamAPI;
這裡用流在兩個執行緒間通訊,但是Java中的Stream是單向的,所以在兩個執行緒中分別建了一個input和output。這顯然是一種很搓的方式,不過也算是一種通訊方式吧……-_-T,執行的時候那種速度簡直。。。請不要BS我。
public class MethodSix {
private final PipedInputStream inputStream1;
private final PipedOutputStream outputStream1;
private final PipedInputStream inputStream2;
private final PipedOutputStream outputStream2;
private final byte[] MSG;
public MethodSix() {
inputStream1 = new PipedInputStream();
outputStream1 = new PipedOutputStream();
inputStream2 = new PipedInputStream();
outputStream2 = new PipedOutputStream();
MSG = “Go”.getBytes();
try {
inputStream1.connect(outputStream2);
inputStream2.connect(outputStream1);
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() throws IOException {
inputStream1.close();
inputStream2.close();
outputStream1.close();
outputStream2.close();
}
public Runnable newThreadOne() {
final String[] inputArr = Helper.buildNoArr(52);
return new Runnable() {
private String[] arr = inputArr;
private PipedInputStream in = inputStream1;
private PipedOutputStream out = outputStream1;
public void run() {
for (int i = 0; i < arr.length; i=i+2) {
Helper.print(arr[i], arr[i + 1]);
try {
out.write(MSG);
byte[] inArr = new byte[2];
in.read(inArr);
while(true){
if(“Go”.equals(new String(inArr)))
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
}
public Runnable newThreadTwo() {
final String[] inputArr = Helper.buildCharArr(26);
return new Runnable() {
private String[] arr = inputArr;
private PipedInputStream in = inputStream2;
private PipedOutputStream out = outputStream2;
public void run() {
for (int i = 0; i < arr.length; i++) {
try {
byte[] inArr = new byte[2];
in.read(inArr);
while(true){
if(“Go”.equals(new String(inArr)))
break;
}
Helper.print(arr[i]);
out.write(MSG);
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
}
public static void main(String args[]) throws IOException {
MethodSix six = new MethodSix();
Helper.instance.run(six.newThreadOne());
Helper.instance.run(six.newThreadTwo());
Helper.instance.shutdown();
six.shutdown();
}
4. 第四種解法,是利用BlockingQueue;
順便總結下BlockingQueue的一些內容。
BlockingQueue定義的常用方法如下:

add(Object):把Object加到BlockingQueue裡,如果BlockingQueue可以容納,則返回true,否則丟擲異常。
offer(Object):表示如果可能的話,將Object加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false。
put(Object):把Object加到BlockingQueue裡,如果BlockingQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡有空間再繼續。
poll(time):獲取並刪除BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null。當不傳入time值時,立刻返回。
peek():立刻獲取BlockingQueue裡排在首位的物件,但不從佇列裡刪除,如果佇列為空,則返回null。
take():獲取並刪除BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的物件被加入為止。
BlockingQueue有四個具體的實現類:

ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小。其所含的物件是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的物件是以FIFO順序排序的。
PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式所帶的Comparator決定的順序。
SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。
這裡我用了兩種玩法:

一種是共享一個queue,根據peek和poll的不同來實現;
第二種是兩個queue,利用take()會自動阻塞來實現。
public class MethodSeven {
private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
public Runnable newThreadOne() {
final String[] inputArr = Helper.buildNoArr(52);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i=i+2) {
Helper.print(arr[i], arr[i + 1]);
queue.offer(“TwoToGo”);
while(!“OneToGo”.equals(queue.peek())){}
queue.poll();
}
}
};
}
public Runnable newThreadTwo() {
final String[] inputArr = Helper.buildCharArr(26);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i++) {
while(!“TwoToGo”.equals(queue.peek())){}
queue.poll();
Helper.print(arr[i]);
queue.offer(“OneToGo”);
}
}
};
}
private final LinkedBlockingQueue queue1 = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue queue2 = new LinkedBlockingQueue<>();
public Runnable newThreadThree() {
final String[] inputArr = Helper.buildNoArr(52);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i=i+2) {
Helper.print(arr[i], arr[i + 1]);
try {
queue2.put(“TwoToGo”);
queue1.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}
public Runnable newThreadFour() {
final String[] inputArr = Helper.buildCharArr(26);
return new Runnable() {
private String[] arr = inputArr;
public void run() {
for (int i = 0; i < arr.length; i++) {
try {
queue2.take();
Helper.print(arr[i]);
queue1.put(“OneToGo”);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}
public static void main(String args[]) throws InterruptedException {
MethodSeven seven = new MethodSeven();
Helper.instance.run(seven.newThreadOne());
Helper.instance.run(seven.newThreadTwo());
Thread.sleep(2000);
System.out.println("");
Helper.instance.run(seven.newThreadThree());
Helper.instance.run(seven.newThreadFour());
Helper.instance.shutdown();
}