1. 程式人生 > >java之JUC系列-外部Tools-Executors|Semaphor|Exchanger|CyclicBarrier|CountDownLatch

java之JUC系列-外部Tools-Executors|Semaphor|Exchanger|CyclicBarrier|CountDownLatch

前面寫了兩篇JDBC原始碼的文章,自己都覺得有點枯燥,先插一段JUC系列的文章來換換胃口,前面有文章大概介紹過J U C包含的東西,JUC體系包含的內容也是非常的多,不是一兩句可以說清楚的,我這首先列出將會列舉的JUC相關的內容,然後介紹本文的版本:Tools部分

J.U.C體系的主要大板塊包含內容,如下圖所示:


注意這個裡面每個部分都包含很多的類和處理器,而且是相互包含,相互引用的,相互實現的。

說到J UC其實就是說java的多執行緒等和鎖,前面說過一些狀態轉換,中斷等,我們今天來用它的tools來實現一些有些小意思的東西,講到其他內容的時候,再來想想這寫tools是怎麼實現的。

tools是本文說要講到的重點,而tools主要包含哪些東西呢:


Tools也包含了5個部分的知識:ExecutorsSemaphor、Exchanger、CyclicBarrier、CountDownLatch,其實也就是五個工具類,這5個工具類有神馬用途呢,就是我們接下來要將的內容了。

Executors:

其實它主要用來建立執行緒池,代理了執行緒池的建立,使得你的建立入口引數變得簡單,通過方法名便知道了你要建立的執行緒池是什麼樣一個執行緒池,功能大概是什麼樣的,其實執行緒池內部都是統一的方法來實現,通過構造方法過載,使得實現不同的功能,但是往往這種方式很多時候不知道具體入口引數的改變有什麼意思,除非讀了原始碼才知道,此時builder模式的方式來完成,builder什麼樣的東西它告訴你就可以。

常見的方法有(都是靜態方法):

1、建立一個指定大小的執行緒池,如果超過大小,放入blocken佇列中,預設是LinkedBlockingQueue,預設的ThreadFactory為:Executors.defaultThreadFactory(),是一個Executors的一個內部類。

Executors.newFixedThreadPool(int) 

內部實現是:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
2、建立一個指定大小的執行緒池,如果超過大小,放入blocken佇列中,預設是LinkedBlockingQueue,自己指定ThreadFactory,自己寫的ThreadFactory,必須implements ThreadFactory,實現方法:newThread(Runnable)

Executors.newFixedThreadPool(int,ThreadFactory) 
內部實現是:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

3、建立執行緒池長度為1的,也就是隻有一個長度的執行緒池,多餘的必須等待,它和呼叫Executors.newFixedThreadPool(1)得到的結果一樣:

 Executors.newSingleThreadExecutor() 

內部實現是:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
是不是蠻簡單的,就是在變引數,你自己也可以new的。

4、和方法3類似,可以自定義ThreadFactory,這裡就不多說了!

5、建立可以進行快取的執行緒池,預設快取60s,資料會放在一個SynchronousQueue上,而不會進入blocken佇列中,也就是隻要有執行緒進來就直接進入排程,這個不推薦使用,因為容易出問題,除非用來模擬一些併發的測試:

Executors.newCachedThreadPool();

內部實現為:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
6、和方法5類似,增加自定義ThreadFactory

7、新增一個Schedule的排程器的執行緒池,預設只有一個排程:

Executors.newSingleThreadScheduledExecutor();

內部實現為(這裡可以看到不是用ThreadPoolExector了,schedule換了一個類,內部實現通過ScheduledThreadPoolExecutor類裡面的內部類ScheduledFutureTask來實現的,這個內部類是private,預設是引用不到的哦):

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
8、和7一樣,增加自己定義的ThreadFactory

9、新增一個schedule的執行緒池排程器,和newFixedThreadPool有點類似:

Executors.newScheduledThreadPool();
內部程式碼為:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

其實內部Exectors裡面還有一些其他的方法,我們就不多說明了,另外通過這裡,大家先可以瞭解一個大概,知道Exectors其實是一個工具類,提供一系列的靜態方法,來完成對對應執行緒池的形象化建立,所以不用覺得很神奇,神奇的是內部是如何實現的,本文我們不闡述文章中各種執行緒池的實現,只是大概上有個認識,等到我們專門將Exector系列的時候,我們會詳細描述這些細節。

OK,我們繼續下一個話題:

Semaphor,這個鳥東西是敢毛吃的呢?

答:通過名字就看出來了,是訊號量。

訊號量可以幹什麼呢?

答:根據一些閥值做訪問控制。

OK,我們這裡模擬一個當多個執行緒併發一段程式碼的時候,如何控制其訪問速度:

import java.util.Random;
import java.util.concurrent.Semaphore;


  public class SemaphoreTest {
      private final static Semaphore MAX_SEMA_PHORE = new Semaphore(10);
      public static void main(String []args) {
           for(int i = 0 ; i < 100 ; i++) {
                final int num = i;
                final Random radom = new Random();
                new Thread() {
                     public void run() {
                         boolean acquired = false;
                         try {
                              MAX_SEMA_PHORE.acquire();
                              acquired = true;
                              System.out.println("我是執行緒:" + num + " 我獲得了使用權!" + DateTimeUtil.getDateTime());
                              long time = 1000 * Math.max(1, Math.abs(radom.nextInt() % 10));
                              Thread.sleep(time);
                              System.out.println("我是執行緒:" + num + " 我執行完了!" + DateTimeUtil.getDateTime());
                         }catch(Exception e) {
                              e.printStackTrace();
                         }finally {
                              if(acquired) {
                                 MAX_SEMA_PHORE.release();
                              }
                         }
                      }
                }.start();
           }
      }
  }
這裡是簡單模擬併發100個執行緒去訪問一段程式,此時要控制最多同時執行的是10個,用到了這個訊號量,執行程式用了一個執行緒睡眠一個隨機的時間來代替,你可以看到後面有執行緒說自己釋放了,就有執行緒獲得了,沒釋放是獲取不到的,內部實現方面,我們暫時不管,暫時知道這樣用就OK。

接下來:

Exchanger十個神馬鬼東西呢?

答:執行緒之間互動資料,且在併發時候使用,兩兩交換,交換中不會因為執行緒多而混亂,傳送出去沒接收到會一直等,由互動器完成互動過程。

啥時候用,沒想到案例?

答:的確很少用,而且案例很少,不過的確有這種案例,Exchanger

import java.util.concurrent.Exchanger;

public class ExchangerTest {
	
	public static void main(String []args) {
		final Exchanger <Integer>exchanger = new Exchanger<Integer>();
		for(int i = 0 ; i < 10 ; i++) {
			final Integer num = i;
			new Thread() {
				public void run() {
					System.out.println("我是執行緒:Thread_" + this.getName() + "我的資料是:" + num);
					try {
						Integer exchangeNum = exchanger.exchange(num);
						Thread.sleep(1000);
						System.out.println("我是執行緒:Thread_" + this.getName() + "我原先的資料為:" + num + " , 交換後的資料為:" + exchangeNum);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}.start();
		}
	}
}


這裡執行你可以看到,如果某個執行緒和另一個執行緒傳送了資料,它接受到的資料必然是另一個執行緒傳遞給他的,中間步驟由Exchanger去控制,其實你可以說,我自己隨機取選擇,不過中間的演算法邏輯就要複雜一些了。

接下來:

CyclicBarrier,關卡模式,搞啥玩意的呢?

答:當你在很多環節需要卡住,要多個執行緒同時在這裡都達到後,再向下走,很有用途。

能否舉個例子,有點抽象?

答:團隊出去旅行,大家一起先達到酒店住宿,然後一起達到遊樂的地方遊玩,然後一起坐車回家,每次需要點名後確認相關人員均達到,然後LZ一聲令下,觸發,大夥就瘋子般的出發了。

下面的例子也是以旅遊的方式來呈現給大家:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class BarrierTest {
	
	private static final int THREAD_COUNT = 10;
	
	private final static CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREAD_COUNT  ,
		new Runnable() {
			public void run() {
				System.out.println("======>我是導遊,本次點名結束,準備走下一個環節!");
			}
		}
	);
	
	public static void main(String []args) 
			throws InterruptedException, BrokenBarrierException {
		for(int i = 0 ; i < 10 ; i++) {
			new Thread(String.valueOf(i)) {
				public void run() {
					try {
						System.out.println("我是執行緒:" + this.getName() + " 我們達到旅遊地點!");
						CYCLIC_BARRIER.await();
						System.out.println("我是執行緒:" + this.getName() + " 我開始騎車!");
						CYCLIC_BARRIER.await();
						System.out.println("我是執行緒:" + this.getName() + " 我們開始爬山!");
						CYCLIC_BARRIER.await();
						System.out.println("我是執行緒:" + this.getName() + " 我們回賓館休息!");
						CYCLIC_BARRIER.await();
						System.out.println("我是執行緒:" + this.getName() + " 我們開始乘車回家!");
						CYCLIC_BARRIER.await();
						System.out.println("我是執行緒:" + this.getName() + " 我們到家了!");
					} catch (InterruptedException e) {
						e.printStackTrace();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			}.start();
		}
	}
}

測試結果中可以發現,大家一起走到某個步驟後,導遊說:“我是導遊,本次點名結束,準備走下一個環節!”,然後才會進入下一個步驟,OK,這個有點意思吧,其實賽馬也是這個道理,只是賽馬通常只有一個步驟,所以我們還有一個方式是:

CountDownLatch的方式來完成賽馬操作,CountDownLatch是用計數器來做的,所以它不可以被複用,如果要多次使用,就要從新new一個出來才可以。我們下面的程式碼中,用兩組賽馬,每組5個參與者來,做一個簡單測試:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
	
	private final static int GROUP_SIZE = 5;
	
	public static void main(String []args) {
		processOneGroup("分組1");
		processOneGroup("分組2");
	}
	
	private static void processOneGroup(final String groupName) {
		final CountDownLatch start_count_down = new CountDownLatch(1);
		final CountDownLatch end_count_down = new CountDownLatch(GROUP_SIZE);
		System.out.println("==========================>\n分組:" + groupName + "比賽開始:");
		for(int i = 0 ; i < GROUP_SIZE ; i++) {
			new Thread(String.valueOf(i)) {
				public void run() {
					System.out.println("我是執行緒組:【" + groupName + "】,第:" + this.getName() + " 號執行緒,我已經準備就緒!");
					try {
						start_count_down.await();//等待開始指令發出即:start_count_down.countDown();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("我是執行緒組:【" + groupName + "】,第:" + this.getName() + " 號執行緒,我已執行完成!");
					end_count_down.countDown();
				}
			}.start();
		}
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("各就各位,預備!");
		start_count_down.countDown();//開始賽跑
		try {
			end_count_down.await();//等待多個賽跑者逐個結束
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("分組:" + groupName + "比賽結束!");
	}
}
有點意思哈,如果你自己用多執行緒實現是不是有點麻煩,不過你可以用Thread的join方法來實現,也就是執行緒的發生join的時候,當前執行緒(一般是主執行緒)要等到對應執行緒執行完run方法後才會進入下一步,為了模擬下,我們也來玩玩:
public class ThreadJoinTest {
	
	private final static int GROUP_SIZE = 5;

	public static void main(String []args) throws InterruptedException {
		Thread []threadGroup1 = new Thread[5];
		Thread []threadGroup2 = new Thread[5];
		for(int i = 0 ; i < GROUP_SIZE ; i++) {
			final int num = i;
			threadGroup1[i] = new Thread() {
				public void run() {
					int j = 0;
					while(j++ < 10) {
						System.out.println("我是1號組執行緒:" + num + " 這個是我第:" + j + " 次執行!");
					}
				}
			};
			threadGroup2[i] = new Thread() {
				public void run() {
					int j = 0;
					while(j++ < 10) {
						System.out.println("我是2號組執行緒:" + num + " 這個是我第:" + j + " 次執行!");
					}
				}
			};
			threadGroup1[i].start();
		}
		for(int i = 0 ; i < GROUP_SIZE ; i++) {
			threadGroup1[i].join();
		}
		System.out.println("-==================>執行緒組1執行完了,該輪到俺了!");
		for(int i = 0 ; i < GROUP_SIZE ; i++) {
			threadGroup2[i].start();
		}
		for(int i = 0 ; i < GROUP_SIZE ; i++) {
			threadGroup2[i].join();
		}
		System.out.println("全部結束啦!哈哈,回家喝稀飯!");
	}
}


程式碼是不是繁雜了不少,呵呵,我們再看看上面的訊號量,如果不用工具,自己寫會咋寫,我們模擬CAS鎖,使用Atomic配合完成咋來做呢。也來玩玩,呵呵:
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadWaitNotify {
	
	private final static int THREAD_COUNT = 100;
	
	private final static int QUERY_MAX_LENGTH = 2;
	
	private final static AtomicInteger NOW_CALL_COUNT = new AtomicInteger(0);

	public static void main(String []args) throws InterruptedException {
		Thread []threads = new Thread[THREAD_COUNT];
		for(int i = 0 ; i < THREAD_COUNT ; i++) {
			threads[i] = new Thread(String.valueOf(i)) {
				synchronized public void run() {
					int nowValue = NOW_CALL_COUNT.get();
					while(true) {
						if(nowValue < QUERY_MAX_LENGTH && NOW_CALL_COUNT.compareAndSet(nowValue, nowValue + 1)) {
							break;//獲取到了
						}
						try {
							this.wait(1000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						nowValue = NOW_CALL_COUNT.get();//獲取一個數據,用於對比
					}
					System.out.println(this.getName() + "======我開始做操作了!");
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(this.getName() + "======操作結束了!");
					NOW_CALL_COUNT.getAndDecrement();
					this.notify();
				}
			};
		}
		for(int i = 0 ; i < THREAD_COUNT ; i++) {
			threads[i].start();
		}
	}
}

還是有點意思哈,這樣寫就是大部分人對while迴圈那部分會寫暈掉,主要是要不斷去判定和嘗試,wait()預設是長期等待,但是我們不想讓他長期等待,就等1s然後再嘗試,其例項子還可以改成wait一個隨機的時間範圍,這樣模擬的效果會更加好一些;另外實際的程式碼中,如果獲取到鎖後,notify方法應當放在finally中,才能保證他肯定會執行notify這個方法。

OK,本文就是用,玩,希望玩得有點爽,我們後面會逐步介紹它的實現機制以及一寫執行緒裡頭很好用,但是大家又不是經常用的東西。