1. 程式人生 > >Java執行緒間通訊與訊號量

Java執行緒間通訊與訊號量

1. 訊號量Semaphore

先說說Semaphore,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。一般用於控制併發執行緒數,及執行緒間互斥。另外重入鎖 ReentrantLock 也可以實現該功能,但實現上要複雜些。
功能就類似廁所有5個坑,假如有10個人要上廁所,那麼同時只能有多少個人去上廁所呢?同時只能有5個人能夠佔用,當5個人中 的任何一個人讓開後,其中等待的另外5個人中又有一個人可以佔用了。另外等待的5個人中可以是隨機獲得優先機會,也可以是按照先來後到的順序獲得機會。
單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合。

例子:

/**
 * @Description:
 * @param @param args
 * @return void 返回型別
 */
public static void main(String[] args) {
    // 執行緒池
    ExecutorService exec = Executors.newCachedThreadPool();
    // 只能5個執行緒同時訪問
    final Semaphore semp = new Semaphore(5);
    // 模擬20個客戶端訪問
    for (int index = 0; index < 20; index++) {
        final int NO = index;
        Runnable run = new Runnable() {
            public void run() {
                try {
                    // 獲取許可
                    semp.acquire();
                    System.out.println("獲得Accessing: " + NO);
                    Thread.sleep((long) (Math.random() * 10000));
                    // 訪問完後,釋放
                    semp.release();
                    System.out.println("剩餘可用訊號-----------------"
                            + semp.availablePermits());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        exec.execute(run);
    }
    // 退出執行緒池
    exec.shutdown();
}

輸出結果(可以想想為什麼會這樣輸出):

獲得Accessing: 1
獲得Accessing: 5
獲得Accessing: 2
獲得Accessing: 3
獲得Accessing: 0
剩餘可用訊號-----------------1
獲得Accessing: 4
剩餘可用訊號-----------------1
獲得Accessing: 9
剩餘可用訊號-----------------1
獲得Accessing: 8
剩餘可用訊號-----------------1
獲得Accessing: 6
剩餘可用訊號-----------------1
獲得Accessing: 10
剩餘可用訊號-----------------1
獲得Accessing: 11
剩餘可用訊號-----------------1
獲得Accessing: 12
剩餘可用訊號-----------------1
獲得Accessing: 13
剩餘可用訊號-----------------1
獲得Accessing: 7
剩餘可用訊號-----------------1
獲得Accessing: 15
剩餘可用訊號-----------------1
獲得Accessing: 16
剩餘可用訊號-----------------1
獲得Accessing: 17
剩餘可用訊號-----------------1
獲得Accessing: 14
剩餘可用訊號-----------------1
獲得Accessing: 18
剩餘可用訊號-----------------1
獲得Accessing: 19
剩餘可用訊號-----------------1
剩餘可用訊號-----------------2
剩餘可用訊號-----------------3
剩餘可用訊號-----------------4
剩餘可用訊號-----------------5

2. 使用PIPE作為執行緒間通訊橋樑

Pipe有一個source通道和一個sink通道。資料會被寫到sink通道,從source通道讀取。一進一出。先作為初步瞭解怎麼使用。
值得注意的是該類在java.nio.channels下,說明該類屬於nio方式的資料通訊方式,那就使用Buffer來緩衝資料。

Pipe原理的圖示:
Pipe原理圖

  • Pipe就是個空管子,這個空管子一頭可以從管子裡往外讀,一頭可以往管子裡寫
  • 操作流程:
    • 1.首先要有一個物件往這個空管子裡面寫。寫到哪裡呢?這個空管子是有一點空間的,就在這個管子裡。
      寫的時候就是寫到管子本身包含的這段空間裡的。這段空間大小是1024個位元組。
    • 2.然後另一個物件才能將這個裝滿了的管子裡的內容讀出來。

上程式碼

package com.jx.test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class testPipe {

    /**
     * @Description:
     * @param @param args
     * @return void 返回型別
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        // 建立一個管道
        Pipe pipe = Pipe.open();
        final Pipe.SinkChannel psic = pipe.sink();// 要向管道寫資料,需要訪問sink通道
        final Pipe.SourceChannel psoc = pipe.source();// 從讀取管道的資料,需要訪問source通道

        Thread tPwriter = new Thread() {

            public void run() {
                try {
                    System.out.println("send.....");
                    // 建立一個執行緒,利用管道的寫入口Pipe.SinkChannel型別的psic往管道里寫入指定ByteBuffer的內容
                    int res = psic.write(ByteBuffer
                            .wrap("Hello,Pipe!測試通訊.....".getBytes("utf-16BE")));
                    System.out.println("send size:" + res);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread tPreader = new Thread() {
            public void run() {
                int bbufferSize = 1024 * 2;
                ByteBuffer bbuffer = ByteBuffer.allocate(bbufferSize);
                try {
                    System.out.println("recive.....");
                    // 建立一個執行緒,利用管道的讀入口Pipe.SourceChannel型別的psoc將管道里內容讀到指定的ByteBuffer中                   
                    int res = psoc.read(bbuffer);//資料未
                     System.out.println("recive size:"+res+" Content:" + ByteBufferToString(bbuffer));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        tPwriter.start();
        tPreader.start();
    }

    /**
     *ByteBuffer--> String的轉換函式
     */
    public static String ByteBufferToString(ByteBuffer content) {
        if (content == null || content.limit() <= 0
                || (content.limit() == content.remaining())) {
            System.out.println("不存在或內容為空!");
            return null;
        }
        int contentSize = content.limit() - content.remaining();
        StringBuffer resultStr = new StringBuffer();
        for (int i = 0; i < contentSize; i += 2) {
            resultStr.append(content.getChar(i));
        }
        return resultStr.toString();
    }

}