1. 程式人生 > >JAVA Concurrent.util工具類的使用

JAVA Concurrent.util工具類的使用

一、CyclicBarrier的使用:

假設有隻有的一個場景:每個執行緒代表一個跑步運動員,當運動員都準備好後,才一起出發,只要有一個沒有準備好,大家都等待。

CyclicBarrier示例:

public class UseCyclicBarrier {
public static void main(String[] args) {
//引數表示3個執行緒一起準備OK繼續執行,少一個執行緒所有執行緒一直阻塞。多一個執行緒多出來的執行緒會一直阻塞不執行只有引數個數的執行緒會繼續執行。
CyclicBarrier barrier=new CyclicBarrier(3);
Runner zs=new Runner(barrier, “張三”);
Runner ls=new Runner(barrier, “李四”);
Runner ww=new Runner(barrier, “王五”);
Runner ll=new Runner(barrier, “麗麗”);
ExecutorService executor=Executors.newFixedThreadPool(3);
executor.execute(zs);
executor.execute(ls);
executor.execute(ww);
// executor.execute(ll);
executor.shutdown();
}
static class Runner implements Runnable{
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;

this.name = name;
}
@Override
public void run() {
try {
int time=1000*(new Random()).nextInt(5);
System.out.println(name+“睡眠時間:”+time);
Thread.sleep(time);
System.out.println(name+“準備OK!”);
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name+"–gogogogo");
}
}
}

輸出結果:

張三睡眠時間:4000
王五睡眠時間:3000
李四睡眠時間:1000
李四準備OK!
王五準備OK!
張三準備OK!
張三–gogogogo
李四–gogogogo
王五–gogogogo

注意:CyclicBarrier barrier=new CyclicBarrier(3);//引數表示3個執行緒一起準備OK繼續執行,少一個執行緒所有執行緒一直阻塞。多一個執行緒多出來的執行緒會一直阻塞不執行只有引數個數的執行緒會繼續執行。

二、CountDownLacth使用:

它經常用於監聽某些初始化操作,等初始化執行完畢後,通知主執行緒繼續工作。

CountDownLatch 示例

public static void main(String[] args) {
	//初始化引數2表示需要等待兩個子執行緒的countDown()方法的通知
	final CountDownLatch countDown=new CountDownLatch(2);
	Thread t1=new Thread(new Runnable() {			
		@Override
		public void run() {
			try {
				System.out.println("進入執行緒t1等待其他執行緒處理完成...");
				countDown.await();
				System.out.println("t1執行緒繼續執行...");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	},"t1");
	Thread t2=new Thread(new Runnable() {			
		@Override
		public void run() {
			try {
				System.out.println("t2執行緒進入初始化操作...");
				Thread.sleep(3000);
				System.out.println("t2執行緒初始化完畢,通知t1執行緒繼續執行...");
				countDown.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	},"t2");
	Thread t3=new Thread(new Runnable() {			
		@Override
		public void run() {
			try {
				System.out.println("t3執行緒進入初始化操作...");
				Thread.sleep(5000);
				System.out.println("t3執行緒初始化完畢,通知t1執行緒繼續執行...");
				countDown.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	},"t3");
	t1.start();
	t2.start();
	t3.start();
}

輸出結果

進入執行緒t1等待其他執行緒處理完成…
t3執行緒進入初始化操作…
t2執行緒進入初始化操作…
t2執行緒初始化完畢,通知t1執行緒繼續執行…
t3執行緒初始化完畢,通知t1執行緒繼續執行…
t1執行緒繼續執行…

初始化CountDownLatch的引數表示需要等待兩個子執行緒的countDown()方法的通知,當達到給定的2的時候其他的執行緒在呼叫countDown()也不起作用。

CyclicBarrier和CountDownLacth的區別

CyclicBarrier:多個執行緒等待完成準備然後在同時各自執行自己的執行緒。
CountDownLatch:一個執行緒等待多個執行緒執行完成通知。

三、Callable和Future使用

Future模式非常適合處理很耗時的業務邏輯時進行使用,可以有效的減小系統的響應時間,提高系統的吞吐量。

Future示例:

public class UseFuture implements Callable{

private String params;

public UseFuture(String params) {
	this.params=params;
}
@Override
public String call() throws Exception {
	Thread.sleep(3000);
	return "根據請求引數:"+params+"返回結果!";
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
	String params="我要大美女";
	FutureTask<String> futureTask1=new FutureTask<>(new UseFuture(params+"1"));
	FutureTask<String> futureTask2=new FutureTask<>(new UseFuture(params+"2"));
	ExecutorService es=Executors.newFixedThreadPool(2);
	//submit和execute的區別:submit有返回值,execute無返回值
	Future f=es.submit(futureTask1);
	es.execute(futureTask2);
	System.out.println("請求完畢!");
	//f.get()任務一處理完就返回一個NULL
	System.out.println(f.get());
	while(true){
		if(f.get()==null){
			System.out.println("任務執行完畢");
			break;
		}
	}	
	
	try {
		Thread.sleep(4000);			
		System.out.println("返回結果1:"+futureTask1.get());
		System.out.println("返回結果2:"+futureTask2.get());
	} catch (Exception e) {
		e.printStackTrace();
	}
	
	es.shutdown();
}

}

輸出結果:

請求完畢!
null
任務執行完畢
返回結果1:根據請求引數:我要大美女1返回結果!
返回結果2:根據請求引數:我要大美女2返回結果!

總結 :
ExecutorService es=Executors.newFixedThreadPool(2);
// ** submit和execute的區別 **:submit有返回值,execute無返回值
Future f=es.submit(futureTask1);
es.execute(futureTask2);
上面兩個返回結果同時打印表示futureTask1.get()非同步獲取結果。

三、訊號量

描述:在Semaphore訊號量非常適合高併發訪問,新系統在上線之前,要對系統的訪問量進行評估,當然這個值肯定不是隨便拍拍腦袋就能想出來的,是經過以往的經驗、資料、歷年的 訪問,以及推廣力度進行一個合理的評估,當然評估標準不能太大也不能太小,太大的話投入的資源達不到實際效果,純粹浪費資源,太小的話,某個時間點一個高峰值的訪問量上來直接可以壓垮系統。

相關概念:

PV(page view) 網站的總訪問量,頁面瀏覽量或點選量,使用者每重新整理一次就會 被記錄一次。
UV(unique Visitor)訪問網站的一臺電腦客戶端為一個訪客。一般來講,時間上以00:00-24:00之內相同IP的客戶端只記錄一次。
QPS(query per second)即每秒查詢數,qps很大程度上代表了系統業務上的繁忙程度,每次請求的背後,可能對應著多次磁碟I/O,多次網路請求,多個cpu時間片等。我們通過qps可以非常直觀的瞭解當前系統業務情況,一旦當前qps超過所設定的預警閾值,可以考慮增加機器對叢集擴容,以免壓力過大導致宕機,可以根據前期的壓力測試得到估值,在結合後期綜合運維情況,估算出閾值。
RT(response time)即請求的響應時間,這個指標非常關鍵,直接說明前端使用者的體驗,因此任何系統設計師都想降低rt時間。
當然還涉及cpu、記憶體、網路、磁碟等情況,更細節的問題很多,如select、update、delete/ps等資料庫層面的統計。

容量評估:一般來說通過開發、運維、測試、以及業務等相關人員,綜合出系統的一系列閾值,然後我們根據關鍵閾值如qps、rt等,對系統進行有效的變更。

一般來講,我們進行多輪壓力測試以後,可以對系統進行峰值評估,採用所謂的80/20原則,即80%的訪問請求將在20%的時間內達到。這樣我們可以根據系統對應的PV計算出峰值qps。

qps=(總PV * 80%)/(606024*20%)

然後在將總的峰值QPS除以單臺機器所能承受的最高的qps值,就是所需要機器的數量:機器數=總的峰值qps/壓測得出的單擊極限qps

當然不排除系統在上線前進行大型促銷活動,或者雙十一,雙十二熱點事件、遭受到DDos攻擊等情況,系統的開發和運維人員急需要了解當前系統執行的狀態和負載情況,一般都會有後臺系統去維護。

semaphore 訊號量示例:

public class UseSemaphore {
public static void main(String[] args) {
//執行緒池
ExecutorService es=Executors.newCachedThreadPool();
//只能5個執行緒同時訪問
final Semaphore semaphore=new Semaphore(5);
//模擬20個執行緒同時訪問
for (int i = 0; i < 20; i++) {
final int NO=i;
Runnable r=new Runnable() {
@Override
public void run() {
try {
//拿到許可
semaphore.acquire();
System.out.println("Accessing: "+NO);
//模擬實際業務邏輯
Thread.sleep((long) (Math.random()*10000));
//訪問完畢,釋放
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
es.submit®;
}
es.shutdown();
}

}

輸出結果:

Accessing: 1
Accessing: 3
Accessing: 5
Accessing: 9
Accessing: 13
Accessing: 17
Accessing: 7
Accessing: 11
Accessing: 15
Accessing: 8
Accessing: 19
Accessing: 12
Accessing: 16
Accessing: 2
Accessing: 0
Accessing: 6
Accessing: 4
Accessing: 10
Accessing: 14
Accessing: 18
總結:第一次輸出5個。後面保證每次只有5個活躍的執行緒。