1. 程式人生 > >Semaphore 的介紹與實現生產者消費者模式案例

Semaphore 的介紹與實現生產者消費者模式案例

一、介紹 

   Semaphore是一種基於計數的訊號量。它可以設定一個閾值,基於此,多個執行緒競爭獲取許可訊號,做完自己的申請後歸還,超過閾值後,執行緒申請許可訊號將會被阻塞。Semaphore可以用來構建一些物件池,資源池之類的,比如資料庫連線池,我們也可以建立計數為1的Semaphore,將其作為一種類似互斥鎖的機制,這也叫二元訊號量,表示兩種互斥狀態。它的用法如下:
 


public class Service {
	private Semaphore semaphore = new Semaphore(1);//只能通過一個執行緒
	
	public void testMethod() {
		try {
			semaphore.acquire();//獲取許可 permit
		
		System.out.println(Thread.currentThread().getName()+
				"begin timer:"+System.currentTimeMillis());
		Thread.sleep(5000);
		System.out.println(Thread.currentThread().getName()+
				"end timer="+System.currentTimeMillis());
		semaphore.release();//釋放持有許可
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
    • 二、方法摘要

      所有方法  介面方法  具體的方法 
      Modifier and Type Method and Description
      void acquire()

      從該訊號量獲取許可證,阻止直到可用,或執行緒為 interrupted

      void
      acquire
      (int permits)

      從該訊號量獲取給定數量的許可證,阻止直到所有可用,否則執行緒為 interrupted

      void acquireUninterruptibly()

      從這個訊號燈獲取許可證,阻止一個可用的。

      void acquireUninterruptibly(int permits)

      從該訊號量獲取給定數量的許可證,阻止直到所有可用。

      int
      availablePermits
      ()

      返回此訊號量中當前可用的許可數。

      int drainPermits()

      獲取並返回所有可立即獲得的許可證。

      protected Collection<Thread> getQueuedThreads()

      返回一個包含可能正在等待獲取的執行緒的集合。

      int getQueueLength()

      返回等待獲取的執行緒數的估計。

      boolean hasQueuedThreads()

      查詢任何執行緒是否等待獲取。

      boolean isFair()

      如果此訊號量的公平設定為真,則返回 true

      protected void reducePermits(int reduction)

      縮小可用許可證的數量。

      void release()

      釋放許可證,將其返回到訊號量。

      void release(int permits)

      釋放給定數量的許可證,將其返回到訊號量。

      String toString()

      返回一個標識此訊號量的字串及其狀態。

      boolean tryAcquire()

      從這個訊號量獲得許可證,只有在呼叫時可以使用該許可證。

      boolean tryAcquire(int permits)

      從這個訊號量獲取給定數量的許可證,只有在呼叫時全部可用。

      boolean tryAcquire(int permits, long timeout, TimeUnit unit)

      從該訊號量獲取給定數量的許可證,如果在給定的等待時間內全部可用,並且當前執行緒尚未 interrupted

      boolean tryAcquire(long timeout, TimeUnit unit)

      如果在給定的等待時間內可用,並且當前執行緒尚未 到達 interrupted,則從該訊號量獲取許可。

    • 三、構造方法詳細資訊

      • Semaphore

        public Semaphore(int permits)

        建立一個 Semaphore與給定數量的許可證和非公平公平設定。

        引數

        permits - permits的初始許可證。 該值可能為負數,在這種情況下,必須在任何獲取被授予之前釋出釋放。

      • Semaphore

        public Semaphore(int permits,
                         boolean fair)

        建立一個 Semaphore與給定數量的許可證和給定的公平設定。

        引數

        permits - permits的初始許可證。 該值可能為負數,在這種情況下,必須在任何獲取被授予之前釋出釋放。

        fair - true如果這個訊號量將保證首先在競爭中首先授予許可證,否則 false

四、用Semaphore實現生產者消費者模式

Service 處理業務邏輯:


import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class RepastService {
	volatile private Semaphore setSemaphore = new Semaphore(10);//廚師
	volatile private Semaphore getSemaphore = new Semaphore(10);//顧客
	volatile private ReentrantLock lock = new ReentrantLock(); //鎖
	volatile private Condition setCondition = lock.newCondition();
	volatile private Condition getCondition = lock.newCondition();
	volatile private Object[] producePosition = new Object[4];//只有四個餐盤
	
	private boolean isEmpty() {
		boolean isEmpty = true;
		for (int i = 0; i< producePosition.length;i++) {
			    if (producePosition[i]!=null) {
				isEmpty=false;
				break;
			    }
		     }
			if(isEmpty==true) {
					return true;
			}else {
				return false;
			}			
				
	}
		

	private boolean isFull() {
		boolean isFull = true;
		for (int i = 0; i < producePosition.length; i++) {
			if(producePosition[i]==null) {
				isFull = false;
				break;
			}
		}
		return isFull;
	}
	
	public void set() {
	
			try {
				setSemaphore.acquire();
				
				lock.lock();
				while(isFull()) {
					//生產者在等待
					setCondition.await();
				}
				for (int i = 0; i < producePosition.length; i++) {
					if(producePosition[i]==null) {
						producePosition[i] = "資料";
						System.out.println(Thread.currentThread().getName()+" 生產了 "+producePosition[i]);
						break;
						
					}
				}
				getCondition.signalAll();
				lock.unlock();
			 
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				setSemaphore.release();
			}
	}
	public void get() {
		  try {
				getSemaphore.acquire();
			
				lock.lock();
				while (isEmpty()) {
					getCondition.await();
				}
				for (int i = 0; i < producePosition.length; i++) {
					if(producePosition[i]!=null) {
						System.out.println(Thread.currentThread().getName()+" 消費了 "+ producePosition[i]);
						producePosition[i]=null;
						break;
					}
			    } 
				setCondition.signalAll();
				lock.unlock();
		  }catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
		 } finally {
			 getSemaphore.release();
		}
	}
}

ThreadCus 消費者:


public class ThreadCus extends Thread {
	private RepastService service ;

	public ThreadCus(RepastService service) {
		super();
		this.service = service;
	}
	
	public void run() {
		service.get();
	}
}

ThreadPro 生產者:


public class ThreadPro extends Thread {
	private RepastService service ;

	public ThreadPro(RepastService service) {
		super();
		this.service = service;
	}
	
	public void run() {
		service.set();
	}
}

測試類:


public class Runtest {
	public static void main(String[] args) throws InterruptedException {
		RepastService service = new RepastService();
		ThreadPro[] arrayP = new ThreadPro[60];
		ThreadCus[] arrayC = new ThreadCus[60];
		for (int i = 0; i < arrayP.length; i++) {
			arrayP[i] = new ThreadPro(service);
			arrayC[i] = new ThreadCus(service);
		}
		Thread.sleep(2000);
		for (int i = 0; i < arrayP.length; i++) {
			arrayP[i].start();
			arrayC[i].start();
		}
	}
}