1. 程式人生 > >同步計數器CountDownLatch 和CyclicBarrier

同步計數器CountDownLatch 和CyclicBarrier

CountDownLatch ,把一個工作分給5個人,5個執行緒都執行完了,呼叫countDown,給計數器減數,而主執行緒await,等數為零,主執行緒繼續往下執行,即5條執行緒都完成才算工作完成。

內部很簡單,還是繼承AQS,把設定的數量賦值給state,countDown就減state,await想必掛起執行緒和解放執行緒。

CyclicBarrier 這麼打比方,5個執行緒,每個人的工作分成兩部分,完成第一部分後,需要確保5個人都完成了第一部分,大家才能各自進行第二部分。同時CyclicBarrier  可以設定一個主任務,在5個人的第一部分都執行完之後,這個主任務就會執行

CountDownLatch 是一個執行緒(例子中是主執行緒)等待其他執行緒執行完畢後,才能執行。

CyclicBarrier 是多個執行緒互相等待對方執行到指定的那一步,然後這些執行緒才能繼續執行

一.CountDownLatch

1.例子

public class TestCountDown {

	public static void main(String[] args) throws InterruptedException {
		CountDownLatch latch = new CountDownLatch(4);
		Work w1 = new Work(latch);
		Work w2 = new Work(latch);
		Work w3 = new Work(latch);
		w1.start();
		w2.start();
		w3.start();
		latch.await();
		System.out.println("latch要等計數器為0,await方法才能釋放");
	}
	
	static class Work extends Thread{
		
		private CountDownLatch latch;
		
		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName()+"正在工作");
			latch.countDown();
		}

		public Work(CountDownLatch latch) {
			super();
			this.latch = latch;
		}
		
	}
}

主執行緒只有等三個執行緒都執行完,計數器為零後,await方法才釋放,才執行

如果改為CountDownLatch latch = new CountDownLatch(4);

會發現主執行緒一直堵塞,因為計數器永遠無法到0

2.應用場景

開五個執行緒去上傳或下載,只有五個執行緒都成功才算成功

3.原始碼

new CountDownLatch(4);數量其實是Sync的state

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

countDown其實就是在對state減一,到0時就不能減了

   public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

await應該是計算state是否=0,不等於就堵塞

   public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

  private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

二.CyclicBarrier

1.例子

public class TestCyclicBarrier {

	public static void main(String[] args) {
		CyclicBarrier barrier = new CyclicBarrier(3,new TotalTask());
		Work w1 = new Work(barrier);
		Work w2 = new Work(barrier);
		Work w3 = new Work(barrier);
		w1.start();w2.start();w3.start();
		System.out.println("z主執行緒");
	}
	
	static class TotalTask extends Thread{
		@Override
		public void run() {
			System.out.println("所有任務的第一部分都執行完,總任務就會執行");
		}
	}
	
	static class Work extends Thread{
		
		private CyclicBarrier barrier;
		public Work(CyclicBarrier barrier) {
			this.barrier = barrier;
		}

		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName()+"執行工作的第一部分");
			try {
				barrier.await();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (BrokenBarrierException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()+"執行工作的第二部分");
		}
	}
}