1. 程式人生 > >java多執行緒之同步計數器

java多執行緒之同步計數器

同步計數器之--CountDownLatch

    使用場景:當用戶上傳檔案,我們可以多開幾個執行緒來上傳檔案,當所有的執行緒上傳完成後,檔案才算上傳完成,在此場景下可以使用CountDownLatch,當每個執行緒上傳完成,呼叫CountDownLatch.countDown()使計數器減1,當計數器為0時表示檔案上傳完成。

    CountDownLatch的同步器是通過AQS來實現同步的,在CountDownLatch內部定義了一個Sycn類,此類繼承AQS來實現同步,我們來看下CountDownLatch的幾個主要方法的執行過程

    1).當CountDownLatch呼叫countDown()時

//CountDownLatch內部呼叫靜態內部類Sync的方法,但是Sync並沒有releaseShared(int i)的實現,所以我們檢視它的父類AQS
public void countDown() {
        sync.releaseShared(1);
    }

AbstractQueuedSynchronizer類

//在AQS中呼叫子類的tryReleaseShared(int i)的方法,如果子類的方法返回true,則 doReleaseShared(),並且自身返回true
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();//釋放訊號量
            return true;
        }
        return false;
    }

Sync類

//
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();//獲取當前計數器
                if (c == 0)//如果計數器為0,則直接返回,因為計數器為0,則釋放所有執行緒,CountDownLatch工作已經結束了
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))//原子性的將計數器減1,返回計數器是否==0
                    return nextc == 0;
            }
        }

2).當CountDownLatch呼叫await()時

//CountDownLatch呼叫Sync類中的acquireSharedInterruptibly(int i)方法,Sync中沒有實現,所以是呼叫AQS的方法()
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

AQS類

//呼叫子類的tryAcquireShared(int i)方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//判斷執行緒是否中斷
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//結合下面的程式碼可以看出,當計數器不等於0時,往下執行,
            doAcquireSharedInterruptibly(arg);
    }
private void doAcquireSharedInterruptibly(int arg)//這段有點看不懂啊
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//獲取當前訊號量是否為0,如果為0這個方法返回1,否則返回-1
                    if (r >= 0) {//當訊號量為0的時候就退出死迴圈,否則一直在這裡面出不去,所以實現了一直等待
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
Sync類
//當計數器等於0時返回1,否則返回-1;
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

CountDownLatch示例

package com.synchronize;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 同步計數器
 * @author lijh
 *
 */
public class MyCountDownLatch {
	 
	public static void main(String[] args) {
	     CountDownLatch doneSignal = new CountDownLatch(5);
	     //定長執行緒池
	     ExecutorService pool = Executors.newFixedThreadPool(1);
	     for (int i = 0; i < 5; ++i){
	    	 pool.execute(new WorkerRunnable(doneSignal, i)); 
	     } 
	    	
	     try {
	    	 //在計數器為0之前會一直等待
			 doneSignal.await();
		 } catch (InterruptedException e) {
			 e.printStackTrace();
		 } 
	     //關閉執行緒池
	     pool.shutdown();
	     
	     System.out.println("main 執行緒結束");
	}
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;
    WorkerRunnable(CountDownLatch doneSignal, int i) {
       this.doneSignal = doneSignal;
       this.i = i;
    }
    public void run() {
    	//每個執行緒做完自己的事情後,計數器減1
        doWork(i);
        doneSignal.countDown();
        Thread.currentThread().interrupt();
    }
    
    void doWork(int i) {
	    try {
		    TimeUnit.SECONDS.sleep(1);
		    System.out.println(doneSignal.getCount());
	    } catch (InterruptedException e) {
		    e.printStackTrace();
	    }
	}
}

同步計數器之--Semaphore

semaphore通常用於限制訪問一些資源的執行緒數目,比如有一個伺服器,同時最多隻能三個人訪問,此時可以使用semaphore實現

Semaphore方法比較多,此處展示一些常用的


Semaphore示例

package com.synchronize;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
 * Semaphore同步器會規定一個閥值來限制對資源的訪問,使用資源時,執行緒最多不能超出這個閥值
 * 場景:當前駕校有2輛車,有5個學員練車,每次只有2個學員可以練車,
 * 其他學員需要等待,當有學員練完車了,可以讓給下一個學員練車
 * @author lijh
 *
 */
public class MySemaphore {
	
	public static void main(String[] args) {
		//最多兩個執行緒訪問資源
		Semaphore sp = new Semaphore(2);
		//建立一個容納5個執行緒的執行緒池
		ExecutorService pool = Executors.newFixedThreadPool(5);
		Driver d = new Driver(sp);
		Car c1 = new Car(d);
		Car c2 = new Car(d);
		Car c3 = new Car(d);
		Car c4 = new Car(d);
		Car c5 = new Car(d);
		pool.execute(c1);
		pool.execute(c2);
		pool.execute(c3);
		pool.execute(c4);
		pool.execute(c5);
		
		pool.shutdown();//關閉執行緒池
	}

}
//車
class Car implements Runnable{
	
	private Driver d;
	
	public Car(Driver d){
		this.d = d;
	}

	@Override
	public void run() {
		d.driverCar();
	}
}
//司機
class Driver{
	//同步計數器
	private Semaphore sp;
	
	public Driver(Semaphore sp){
		this.sp = sp;
	}
	//學員練車
	public void driverCar(){
		try {
			sp.acquire();//從此訊號量獲取一個許可,在提供一個許可前一直將執行緒阻塞,否則執行緒被中斷。
			System.out.println(Thread.currentThread().getName()+" start");
			TimeUnit.SECONDS.sleep(2);
			System.out.println(Thread.currentThread().getName()+" end");
			sp.release();////釋放一個許可,將其返回給訊號量。
			//Thread.currentThread().interrupt();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
}

同步計數器之--CyclicBarrier

先看一個示例

package com.synchronize;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
 * CyclicBarrier 允許一組執行緒互相等待,直到到達某個公共屏障點,然後所有執行緒繼續往後執行。
 * CyclicBarrier 與 CountDownLatch 區別在於 CyclicBarrier 是多個執行緒互相等待,
 * 			CountDownLatch 是一個執行緒等待多個執行緒完成工作,等待的物件不同了
 * 場景:公司有4個人,每天早上開晨會時,大家一起去會議室,當所有人都到達會議室,會議開始。
 * 
 * @author lijh
 *
 */
public class MyCyclicBarrier {

	public static void main(String[] args) throws InterruptedException {
		
		CyclicBarrier cb = new CyclicBarrier(4,new Thread(new Meeting()));
		
		ExecutorService pool = Executors.newFixedThreadPool(4);
		Member s1 = new Member(cb);
		Member s2 = new Member(cb);
		Member s3 = new Member(cb);
		Member s4 = new Member(cb);
		
		pool.execute(s1);
		pool.execute(s2);
		pool.execute(s3);
		//pool.execute(s4);
		TimeUnit.SECONDS.sleep(5);
		//當CyclicBarrier.await超時時,會丟擲異常
		pool.execute(s4);
		pool.shutdown();
	}
}
/**
 * 成員
 * @author lijh
 *
 */
class Member implements Runnable{

	private CyclicBarrier cb;
	
	public Member(CyclicBarrier cb){
		this.cb = cb;
	}
	
	@Override
	public void run() {
			try {
				Thread.sleep(1000);
				System.out.println("成員 "+Thread.currentThread().getName()+" 已經到達會議室!");
				cb.await(3,TimeUnit.SECONDS);
				//cb.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (BrokenBarrierException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
	}
	
}
/**
 * 會議
 * @author lijh
 *
 */
class Meeting implements Runnable{

	@Override
	public void run() {
		System.out.println("會議開始!");
	}
	
}

CyclicBarrier有兩個構造方法

    CyclicBarrier(int i) 和CyclicBarrier(int i , Runnable r),第二個構造,是在所有參與者都在此屏障上後,會優先執行r這個執行緒,上面的示例也演示出了這一點。