1. 程式人生 > >AND型訊號量與訊號量集-----程序的同步與互斥面向物件的解決方案(二)

AND型訊號量與訊號量集-----程序的同步與互斥面向物件的解決方案(二)

AND型訊號量

上述的程序互斥問題,是針對各程序之間只共享一個臨界資源而言的。在有些應用場合,是一個程序需要先獲得兩個或更多的共享資源後方能執行其任務。假定現有兩個程序AB,他們都要求訪問共享資料DE。當然,共享資料都應作為臨界資源。為此,可為這兩個資料分別設定用於互斥的訊號量DmutexEmutex,並令它們的初值都是1;

AND同步機制的基本思想是:將程序在整個執行過程中需要的所有資源,一次性全部地分配給程序,待程序使用完後再一起釋放。只要尚有一個資源未能分配給程序,其它所有可能為之分配的資源也不分配給它。亦即,對若干個臨界資源的分配,採取原子操作方式:要麼把它所請求的資源全部分配到程序,要麼一個也不分配。由死鎖理論可知,這樣就可避免上述死鎖情況的發生。為此,在

wait操作中,增加了一個“AND”條件,故稱為AND同步,或稱為同時wait操作,

虛擬碼:如下:

Swait(S1S2,…,Sn)

  if Si>=1 and … and Sn>=1 then

   for i:=1 to n do

   Si:=Si-1

   endfor

  else

  place the process in the waiting queue associated with the first Si found with Si<1and set the program count of this process to the beginning of Swait operation 

  endif

Ssignal(S1S2,…,Sn)

for i:=1 to n do

  Si:=Si+1

Remove all the process waiting in the queue associated with Si into the ready queue.

endfor; 

Java程式碼:

package org.hao.andpv;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class AndPV {

private int[] semaphores;//訊號量陣列

private List<BlockingQueue<Thread>> queueList=new ArrayList<BlockingQueue<Thread>>();

    //每個訊號量對應的阻塞佇列

public AndPV(int[] semaphores) {//設定訊號量初值,初始化

this.semaphores=semaphores;

for(int i=0;i<semaphores.length;i++){

queueList.add(new LinkedBlockingQueue<Thread>());

}

}

public synchronized void p(Thread t) {//p原語

int semaphoreIndex=0;

for(;semaphoreIndex<semaphores.length;semaphoreIndex++){//判斷每個條件是否都滿足

if(semaphores[semaphoreIndex]<1){//semaphoreIndex個條件不滿足

break;

}

}

if(semaphoreIndex<semaphores.length-1){//條件不滿足時

queueList.get(semaphoreIndex).add(t);//新增到阻塞佇列

try {

synchronized(t){       

   t.wait();//執行緒阻塞

}

} catch (Exception e) {

}

}else{

for(semaphoreIndex=0;semaphoreIndex<semaphores.length;semaphoreIndex++){

     semaphores[semaphoreIndex]--;//條件滿足時,可用資源都減一

}

}

}

public synchronized void v(){ //v原語

for(int semaphoreIndex=0;semaphoreIndex<semaphores.length;semaphoreIndex++){

semaphores[semaphoreIndex]++;//程序執行完,可用資源都增一

if(semaphores[semaphoreIndex]>=0){//semaphoreIndex類有可用資源

Thread t=queueList.get(semaphoreIndex).poll();

synchronized(t){

   t.notify();

   p(t);//判斷其他條件是否滿足

}

}

}

}

}

訊號量集

  在記錄型訊號量機制中,wait(S)signal(S)操作僅能對訊號量施以加1或減1操作,意味著每次只能獲得或釋放一個單位的臨界資源。而當一次需要N個某類臨界資源時,便要進行Nwait(S)操作,顯然這是低效的。此外,在有些情況下,當資源數量低於某一下限值時,便不予以分配。因而,在每次分配之前,都必須測試該資源的數量,看其是否大於其下限值。基於上述兩點,可以對AND訊號量機制加以擴充,形成一般化的“訊號量集”機制。Swait操作可描述如下,其中S為訊號量,d為需求值,而t為下限值。 

package org.hao.andpv;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class CollectPV {

private int[] semaphores;

private List<BlockingQueue<Thread>> queueList=new ArrayList<BlockingQueue<Thread>>();

    private Map<Thread,Integer[]> map=new HashMap<Thread,Integer[]>();//儲存第執行緒的請求

public CollectPV(int[] semaphores) {

this.semaphores=semaphores;

for(int i=0;i<semaphores.length;i++){

queueList.add(new LinkedBlockingQueue<Thread>());

}

}

public synchronized void p(Thread t,Integer[] semas) {

try {

int semaphoreIndex=0;

for(;semaphoreIndex<semaphores.length;semaphoreIndex++){

if(semaphores[semaphoreIndex]<semas[semaphoreIndex]){

break;

}

}

if(semaphoreIndex<semaphores.length){

BlockingQueue<Thread> blockingQueue=queueList.get(semaphoreIndex);

blockingQueue.add(t);

queueList.add(semaphoreIndex, blockingQueue);

map.put(t, semas);

try {

synchronized(t){       

   t.wait();

}

} catch (Exception e) {

}

}else{

for(int semaphoresIndex=0;semaphoresIndex<semaphores.length;semaphoresIndex++){

     semaphores[semaphoresIndex]-=semas[semaphoresIndex];

}

}

} catch (Exception e) {

}

}

public synchronized void v(Integer[] semas){

try {

for(int semaphoreIndex=0;semaphoreIndex<semaphores.length;semaphoreIndex++){

semaphores[semaphoreIndex]+=semas[semaphoreIndex];

if(semaphores[semaphoreIndex]>=0){

Thread t=queueList.get(semaphoreIndex).poll();

synchronized(t){

   t.notify();

   p(t, map.get(t));

}

}

}

} catch (Exception e) {

}

}

}