1. 程式人生 > >《實戰Java高併發程式設計》學習總結(2)

《實戰Java高併發程式設計》學習總結(2)

第3章  JDK併發包

1 synchronized的功能擴充套件:重入鎖。使用java.util.concurrent.locks.ReentrantLock類來實現。

import java.util.concurrent.locks.ReentrantLock;

public class ReenterLock implements Runnable {

	public static ReentrantLock lock = new ReentrantLock();
	public static int i = 0;
	@Override
	public void run() {
		// TODO Auto-generated method stub
		for(int j=0;j<100000000;j++){
			lock.lock();
			try{
				i++;
			}finally{
				lock.unlock();
			}
		}
	}
	public static void main(String[] args) throws InterruptedException {
		ReenterLock tl = new ReenterLock();
		Thread t1 = new Thread(tl);
		Thread t2 = new Thread(tl);
		t1.start();t2.start();
		t1.join();t2.join();
		System.out.println(i);
	}

}

重入鎖比synchronized靈活外,還提供一些高階功能

  • 中斷響應。synchronized必須要等到鎖才能繼續執行,而重入鎖是支援執行緒可以被中斷。
import java.util.concurrent.locks.ReentrantLock;

public class IntLock implements Runnable{

	public static ReentrantLock lock1 = new ReentrantLock();
	public static ReentrantLock lock2 = new ReentrantLock();
	int lock;
	
	public IntLock(int lock){
		this.lock = lock;
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			if(lock == 1){
				lock1.lockInterruptibly();
				try{
					Thread.sleep(500);
				}catch(InterruptedException e){
					
				}
				lock2.lockInterruptibly();
			}else{
				lock2.lockInterruptibly();
				try{
					Thread.sleep(500);
				}catch(InterruptedException e){
					
				}
				lock1.lockInterruptibly();
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}finally{
			if(lock1.isHeldByCurrentThread())
				lock1.unlock();
			if(lock2.isHeldByCurrentThread())
				lock2.unlock();
			System.out.println(Thread.currentThread().getId()+":執行緒退出");
		}
	}
	public static void main(String[] args) throws InterruptedException{
		IntLock r1 = new IntLock(1);
		IntLock r2 = new IntLock(2);
		Thread t1 = new Thread(r1);
		Thread t2 = new Thread(r2);
		t1.start();t2.start();
		Thread.sleep(1000);
		t2.interrupt();
	}
	
}
  • 鎖申請等待限時,使用tryLock()
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TimeLock implements Runnable {

	public static ReentrantLock lock = new ReentrantLock();
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			if(lock.tryLock(5,TimeUnit.SECONDS)){  //5秒內無法申請到鎖則失敗
				Thread.sleep(6000);
			}else{
				System.out.println("get lock failed");
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}finally{
			if(lock.isHeldByCurrentThread())
				lock.unlock();
		}
	}
	public static void main(String[] args){
		TimeLock tl = new TimeLock();
		Thread t1 = new Thread(tl);
		Thread t2 = new Thread(tl);
		t1.start();
		t2.start();
	}

}
  • 公平鎖,當引數fair為true時表示鎖為公平。但公平鎖的實現成本高效能相對不佳,預設鎖是非公平的

                  public ReentrantLock( boolean fair )

import java.util.concurrent.locks.ReentrantLock;

public class FairLock implements Runnable{

	public static ReentrantLock fairLock = new ReentrantLock(true);
	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true){
			try{
				fairLock.lock();
				System.out.println(Thread.currentThread().getName()+" 獲得鎖");
			}finally{
				fairLock.unlock();
			}
		}
	}
	public static void main(String[] args) throws InterruptedException{
		FairLock fl = new FairLock();
		Thread t1 = new Thread(fl,"Thread_t1");
		Thread t2 = new Thread(fl,"Thread_t2");
		t1.start();t2.start();
	}

}

輸出 (交替獲取)

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

Thread_t2 獲得鎖

Thread_t1 獲得鎖

2  重入鎖ReentrantLock的幾個重要方法:

  • lock( ) :獲得鎖,如果鎖已經被佔用則等待。
  • lockInterruptibly( ):獲得鎖,但優先響應中斷
  • tryLock( ):嘗試獲得鎖,如果成功返回true,失敗返回false。不等待立即返回
  • tryLock( long time , TimeUnit unit ):在給定時間內嘗試獲得鎖 

3 重入鎖的好搭檔:Condition條件。通過Lock介面的Condition newCondition( )方法可以生成一個與當前重入鎖繫結的Condition例項。利用Condition物件,可以讓執行緒在合適的時間等待或得到通知繼續執行。介面基本方法如下

  • void await( )  throws InterruptedException    // 當前執行緒等待,釋放鎖,其他執行緒用signal()或signalAll()時,執行緒重新獲取鎖繼續執行。或當執行緒被中斷時也能跳出等待
  • void awaitUninterruptibly( ) // 跟await()基本相同,只是它不會在等待過程中響應中斷
  • void awaitNanos(long nanosTimeout) throws InterruptedException 
  • boolean await(long time , TimeUnit unit) throws InterruptedException
  • boolean awaitUntil(Date deadline) throws InterruptedException
  • void signal( )   // 用於喚醒一個在等待中的執行緒
  • void signalAll( )
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockCondition implements Runnable {

	public static ReentrantLock lock = new ReentrantLock();
	public static Condition condition = lock.newCondition();
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			lock.lock();
			condition.await();    // 等待通知
			System.out.println("Thread is going on");
		}catch(InterruptedException e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	public static void main(String[] args) throws InterruptedException{
		ReentrantLockCondition tl = new ReentrantLockCondition();
		Thread t1 = new Thread(tl);
		t1.start();
		Thread.sleep(2000);
		lock.lock();    // 呼叫signal()前要求當前執行緒先獲取相關的鎖
		condition.signal();
		lock.unlock();  // 呼叫signal()後要求釋放當前鎖謙讓給喚醒的執行緒
	}

}

注:呼叫signal()前要求當前執行緒先獲得相關的鎖,呼叫後也要釋放鎖給喚醒的執行緒。

4 允許多個執行緒同時訪問:訊號量(Semaphore) ,可以指定多個執行緒同時訪問某一個資源。函式如下

  • public Semaphore(int permits)  // 必須指定訊號量的准入數,每個執行緒只能申請一個許可。
  • public Semaphore(int permits , boolean fair)    // 第二個引數指定是否公平
  • public void acquire( )   // 嘗試獲得一個准入許可,無法獲得則等待,直到有執行緒釋放一個許可或當前執行緒被中斷
  • public void acquireUninterruptibly( )  // 跟acquire()基本一樣,只是不響應中斷
  • public boolean tryAcquire( )  // 嘗試獲得一個准入許可,獲得則返回true,失敗則返回false。不會等待立即返回
  • public boolean tryAcquire(long timeout , TimeUnit unit)  
  • public void release( ) // 用於線上程訪問資源結束後釋放一個許可

5 ReadWriteLock 讀寫鎖,要比重入鎖或內部鎖更針對

6 倒計時器:CountDownLatch  ,是一個非常實用的多執行緒控制工具類。通常用來控制執行緒等待,讓一個某個執行緒等待直到倒計時結束再開始執行。建構函式如下

        public  CountDownLatch(int count)    // count為計數器的計數個數

public class CountDownLatchDemo implements Runnable {

    // 設定計數10的倒計時器
	static final CountDownLatch end = new CountDownLatch(10);
	static final CountDownLatchDemo demo = new CountDownLatchDemo();
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			Thread.sleep(new Random().nextInt(10)*1000);
			System.out.println("check complete");
            // 倒計時器倒數一次
			end.countDown();
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
	public static void main(String[] args) throws InterruptedException{
		ExecutorService exec = Executors.newFixedThreadPool(10);
		for(int i=0;i<10;i++){
			exec.submit(demo);
		}
        // 倒計時開始,一直等待,只有倒計時器倒數完成後才會往下執行
		end.await();
		System.out.println("Fire!");
		exec.shutdown();
	}

}

7 迴圈柵欄:CyclicBarrier,是一種多執行緒併發控制實用工具。跟CountDownLatch類似,但更復雜強大。建構函式如下

         public CyclicBarrier( int parties , Runnable barrierAction )  // parties為計數總數,barrierAction為計數完成後要去執行的任務

public class CyclicBarrierDemo {

	public static class Soldier implements Runnable {
		private String soldier;
		private final CyclicBarrier cyclic;
		Soldier(CyclicBarrier cyclic,String soldierName){
			this.soldier = soldierName;
			this.cyclic = cyclic;
		}
		@Override
		public void run() {
			// TODO Auto-generated method stub
			try{
                // 呼叫一次倒計時一次,夠計數總數後,cyclic會自動呼叫BarrierRun,繼續執行下面的操作,即doWork()
				cyclic.await();
				doWork();
                // 又重新倒計時,呼叫一次倒計時一次,夠計數總數後,cyclic會自動呼叫BarrierRun,繼續執行下面的操作,即doWork()
				cyclic.await();
			}catch(InterruptedException e){
				e.printStackTrace();
			}catch(BrokenBarrierException e){
				e.printStackTrace();
			}
		}
		void doWork(){
			try{
				Thread.sleep(Math.abs(new Random().nextInt()%10000));
			}catch(InterruptedException e){
				e.printStackTrace();
			}
			System.out.println(soldier+": 任務完成");
		}
		
	}
	public static class BarrierRun implements Runnable{

		boolean flag;
		int N;
		public BarrierRun(boolean flag,int N){
			this.flag = flag;
			this.N = N;
		}
		@Override
		public void run() {
			// TODO Auto-generated method stub
			if(flag){
				System.out.println("司令: [士兵"+N+"個,任務完成!]");
			}else{
				System.out.println("司令: [士兵"+N+"個,集合完畢!]");
				flag = true;
			}
		}
		
	}
	public static void main(String[] args) throws InterruptedException {
		final int N = 10;
		Thread[] allSoldier = new Thread[N];
		boolean flag = false;
		CyclicBarrier cyclic = new CyclicBarrier(10,new BarrierRun(flag,N));
		System.out.println("集合隊伍!");
		for(int i=0;i<N;i++){
			System.out.println("士兵"+i+" 報道!");
			allSoldier[i] = new Thread(new Soldier(cyclic,"士兵i"));
			allSoldier[i].start();
		}
	}
}

8 執行緒阻塞工具類:LockSupport,是一種非常方便實用的執行緒阻塞工具。可以線上程內任意位置讓執行緒阻塞。

public class LockSupportDemo {

	public static Object u = new Object();
	static ChangeObjectThread t1 = new ChangeObjectThread("t1");
	static ChangeObjectThread t2 = new ChangeObjectThread("t2");
	
	public static class ChangeObjectThread extends Thread{
		public ChangeObjectThread(String name){
			super.setName(name);
		}

		@Override
		public void run() {
			// TODO Auto-generated method stub
			synchronized(u){
				System.out.println("in "+getName());
				LockSupport.park();  // 阻塞當前執行緒
			}
		}
	}
	public static void main(String[] args) throws InterruptedException{
		t1.start();
		Thread.sleep(100);
		t2.start();
		LockSupport.unpark(t1);  // 繼續執行
		LockSupport.unpark(t2);  // 繼續執行
		t1.join();
		t2.join();
	}
}

LockSupport類為每個執行緒準備了一個許可,如果許可可用,park()函式會立即返回並消費這個許可,即標記該許可為不可用;如果不可用則會阻塞。unpack()則是將許可變成可用。這樣保證unpack()和pack()的順序總是相對穩定的。

pack(Object)函式還可以為當前執行緒設定一個阻塞物件。

LockSupport.pack()支援中斷,而且還不會丟擲InterruptedException異常。如下

public static class ChangeObjectThread extends Thread{
		public ChangeObjectThread(String name){
			super.setName(name);
		}

		@Override
		public void run() {
			// TODO Auto-generated method stub
			synchronized(u){
				System.out.println("in "+getName());
				LockSupport.park();
				if(Thread.interrupted()){
					System.out.println(getName()+" 被中斷了");
				}
			}
			System.out.println(getName()+" 執行結束");
		}
	}
	public static void main(String[] args) throws InterruptedException{
		t1.start();
		Thread.sleep(100);
		t2.start();
		t1.interrupt();
		LockSupport.unpark(t2);
	}
}

9 執行緒池為了避免系統頻繁地建立和銷燬執行緒,並且複用建立地執行緒。線上程池中,有幾個活躍地執行緒,當需要執行緒時從中取一個空閒的執行緒。完成任務後再退回到執行緒池中JDK提供了一套Executor框架幫助執行緒控制,其本質就是一個執行緒池

以上成員均在java.util.concurrent包中,是JDK併發包的核心類。其中ThreadPoolExecutor表示一個執行緒池。Executors類則扮演執行緒池工廠的角色。通過Executors可以取得一個擁有特定功能的執行緒池。

Executor框架提供了各種型別的執行緒池。主要有以下工廠方法

  • public static ExecutorService newFixedThreadPool( int nThreads)  //返回一個固定執行緒數量的執行緒池,該執行緒池中的執行緒數量固定不變。有新任務時,池中若有空閒執行緒則立即執行。否則存在任務佇列中排隊等候
  • public static ExecutorService newSingleThreadExecutor()  //該方法返回一個只有一個執行緒的執行緒池。若多餘一個任務被提交到該執行緒池,任務會儲存到任務佇列排隊執行
  • public static ExecutorService newCachedThreadPool()  // 該方法返回一個可根據實際情況調整執行緒數量的執行緒池。執行緒數量不確定。如有空閒執行緒可複用優先使用。如沒有空閒則建立新的執行緒執行任務。
  • public static ScheduledExecutorService newSingleThreadScheduledThreadPool()  // 該方法返回一個ScheduledExecutorService物件,執行緒池大小為1。
  • public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize )  // 該方法返回一個ScheduledExecutorService物件,但該執行緒池可以指定執行緒數量

例子

public class ThreadPoolDemo {

	public static class MyTask implements Runnable{

		@Override
		public void run() {
			// TODO Auto-generated method stub
			System.out.println(System.currentTimeMillis()+": Thread ID: "+Thread.currentThread().getId());
			try{
				Thread.sleep(1000);
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}
		
	}
	public static void main(String[] args){
		MyTask task = new MyTask();
		ExecutorService es = Executors.newFixedThreadPool(5);
		//ExecutorService es = Executors.newSingleThreadExecutor();
		//ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<10;i++){
			es.submit(task);
		}
	}
}

// 當使用newFixedThreadPool(5),輸出如下,5個執行緒複用
1535423404418: Thread ID: 9
1535423404418: Thread ID: 12
1535423404418: Thread ID: 11
1535423404418: Thread ID: 10
1535423404418: Thread ID: 8
1535423405420: Thread ID: 8
1535423405420: Thread ID: 9
1535423405420: Thread ID: 10
1535423405420: Thread ID: 11
1535423405420: Thread ID: 12

// 當使用newSingleThreadExecutor(),只有一個執行緒複用,輸出如下
1535423548649: Thread ID: 8
1535423549655: Thread ID: 8
1535423550656: Thread ID: 8
1535423551660: Thread ID: 8
1535423552665: Thread ID: 8
1535423553667: Thread ID: 8
1535423554669: Thread ID: 8
1535423555673: Thread ID: 8
1535423556678: Thread ID: 8
1535423557683: Thread ID: 8

// 當使用newCachedThreadPool(),執行緒不夠會建立新的,輸出如下
1535423614559: Thread ID: 8
1535423614559: Thread ID: 12
1535423614559: Thread ID: 9
1535423614559: Thread ID: 10
1535423614560: Thread ID: 14
1535423614559: Thread ID: 11
1535423614560: Thread ID: 13
1535423614560: Thread ID: 15
1535423614560: Thread ID: 16
1535423614560: Thread ID: 17

10 計劃任務,newScheduledThreadPool(),它返回一個ScheduledExecutorService物件,可以根據時間需要對執行緒進行排程,主要方法如下

public ScheduledFuture<?> schedule(Runnable command , long delay , TimeUnit unit )   // 會在給定時間對任務進行一次排程

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command , long initialDelay , long period , TimeUnit unit )  // 會對任務進行週期性排程排程的頻率是一定的。是以上一個任務開始執行時間為起點,之後的period時間排程下一次任務

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command , long initialDelay , long delay , TimeUnit unit ) // 會對任務進行週期性排程,在上一個任務結束後再經過delay時間進行任務排程

跟其他執行緒池不用,ScheduledExecutorService並不一定會立即執行任務,它其實是起計劃任務的作用。它會在指定的時間,對任務進行排程

  • 如下程式碼,任務會每隔2秒執行,但因為scheduleAtFixedRate()是以上一個任務執行開始時間來算,如果任務執行時間長於排程時間,週期會無效。
public class ScheduledExecutorServiceDemo {

	public static void main(String[] args){
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
		ses.scheduleAtFixedRate(new Runnable(){

			@Override
			public void run() {
				// TODO Auto-generated method stub
				try{
					Thread.sleep(1000);
					System.out.println(System.currentTimeMillis()/1000);
				}catch(InterruptedException e){
					e.printStackTrace();
				}
			}
			
		}, 0, 2, TimeUnit.SECONDS);
	}
}
  • 如下程式碼,scheduleWithFixedDelay()是以上一個任務的結束開始算,所以任務間都間隔2秒
public class ScheduledExecutorServiceDemo {

	public static void main(String[] args){
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
		ses.scheduleWithFixedDelay(new Runnable(){

			@Override
			public void run() {
				// TODO Auto-generated method stub
				try{
					Thread.sleep(1000);
					System.out.println(System.currentTimeMillis()/1000);
				}catch(InterruptedException e){
					e.printStackTrace();
				}
			}
			
		}, 0, 2, TimeUnit.SECONDS);
	}
}

11 無論newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool()都只是ThreadPoolExecutor類的封裝。ThreadPoolExecutor的建構函式如下

public ThreadPoolExecutor( int corePoolSize , int maximumPoolSize , long keepAliveTime , TimeUnit unit , BlockingQueue<Runnable> workQueue , ThreadFactory threadFactory , RejectedExecutionHandler handler )

  • corePoolSize 指定了執行緒池中的執行緒數量
  • maximumPoolSize 指定了執行緒池中的最大執行緒數量
  • keepAliveTime 當執行緒池執行緒數量超過了corePoolSize時,多餘的空閒執行緒的存活時間
  • unit 是keepAliveTime的單位
  • workQueue 任務佇列,被提交但尚未被執行的任務,是一個BlockingQueue介面物件,僅用於存放Runnable物件。
  • threadFactory 執行緒工廠,用於建立執行緒
  • handler 拒絕策略,當任務太多來不及處理,如何拒絕任務

在ThreadPoolExecutor的建構函式中可使用如下幾種BlockingQueue

  • 直接提交的佇列,該功能由SynchronousQueue物件提供。SynchronousQueue是一個特殊的BlockingQueue。SynchronousQueue沒有容量,每個插入操作都要等待一個相應的刪除操作。如果使用SynchronousQueue,提交的任務不會被真實的儲存,而總是將新任務提交給執行緒執行,沒有空閒的執行緒則嘗試建立新的程序。當程序資料達到最大值則執行拒絕策略。所以使用SynchronousQueue佇列,通常要設定很大的maximumPoolSize值否則很容易執行拒絕策略。
  • 有界的任務佇列,可以用ArrayBlockingQueue,它必須帶有一個表示佇列最大容量的引數。有新任務時,執行緒池的實際執行緒數小於corePoolSize,則優先建立新的執行緒;若大於則將新任務加入等待佇列。佇列若已滿,則在匯流排程數不大於maximumPoolSize的前提下建立新的執行緒執行任務。若大於則執行拒絕策略。
  • 無界的任務佇列,可以用LinkedBlockingQueue類實現。除非系統資源耗盡,否則無界的任務佇列不會存在任務入列失敗。有新任務時,執行緒池的實際執行緒數小於corePoolSize,則優先建立新的執行緒;若大於則將新任務加入等待佇列。
  • 優先任務佇列,可以用PriorityBlockingQueue類實現。它是一個無界佇列,可以根據任務自身的優先順序順序先後執行。

JDK內建提供了四種拒絕策略,這些策略都實現了RejectedExecutionHandler介面

  • AbortPolicy策略:該策略直接丟擲異常,阻止系統正常工作。
  • CallerRunsPolicy策略:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。
  • DiscardOledestPolicy策略:丟棄即將被執行的任務,嘗試再次提交當前任務
  • DiscardPolicy策略:丟棄無法處理的任務

12 ThreadPoolExecutor也是一個可以擴充套件的執行緒池,提供了beforeExecute(),afterExecute()和terminated()三個介面對執行緒池進行控制。

13 優化執行緒池執行緒數量

14 線上程池中尋找堆疊

public class DivTask implements Runnable {
	int a , b;
	public DivTask(int a, int b){
		this.a = a;
		this.b = b;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		double re = a/b;
		System.out.println(re);
	}
	public static void main(String[] args) throws InterruptedException,ExecutionException{
		ThreadPoolExecutor pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,
				0L,TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>());
		for(int i=0;i<5;i++){
            pools.submit(new DivTask(100,i));    // 第一種,出現異常不會列印異常錯誤
			//pools.execute(new DivTask(100,i));  // 第二種,出現異常會列印部分堆疊資訊
            /*  第三種,出現異常會列印部分堆疊資訊
			Future re = pools.submit(new DivTask(100,i));
			re.get();
            */
		}
	}
}

15 分而治之:Fork / Join框架。在JDK中,有一個ForkJoinPool執行緒池,對於fork()方法並不急於開啟執行緒,而是提交給ForkJoinPool執行緒池進行處理。

ForkJoinPool有一個重要的介面

     public  <T>  ForkJoinTask<T>  submit(ForkJoinTask<T> task)

可以向ForkJoinPool執行緒池提交一個ForkJoinTask任務,該任務就是支援fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類RecursiveAction和RecursiveTask,分別表示沒有返回值的任務和可以攜帶返回值的任務。

16 JDK的併發容器,大部分在java.util.concurrent包中

  • ConcurrentHashMap:高效的併發HashMap,可以理解為執行緒安全的HashMap
  • CopyOnWriteArrayList:在讀多寫少的場合,其效能要遠比Vector好
  • ConcurrentLinkedQueue:高效的併發佇列,使用連結串列實現。可以看作一個執行緒安全的LinkedList
  • BlockingQueue:這是一個介面,JDK內部通過連結串列,陣列等方式實現這個介面。表示阻塞佇列,非常適合用於資料共享的通道
  • ConcurrentSkipListMap:跳錶的實現。這是一個Map,使用跳錶資料結構進行快速查詢。

java.util下的Vector是執行緒安全,另外Collections工具類可以幫助我們將任意集合包裝成執行緒安全的集合。

17 如果需要一個執行緒安全的HashMap,一種可行的方法是使用Collections.synchronizedMap()方法包裝我們的HashMap。如下

   public static Map m = Collections.synchronizedMap(new HashMap()) ;

另一種更專業的併發HashMap是ConcurrentHashMap,它更適合多執行緒的場合。

18 ArrayList和Vector都是使用陣列作為內部實現,而Vector是執行緒安全,ArrayList不是

 

 

 

 

第4章  鎖的優化及注意事項

1 有助於提高“鎖”效能的幾點建議

  • 減少鎖持有時間,有助於降低鎖衝突的可能性
  • 減小鎖粒度,即縮小鎖定物件的範圍
  • 讀寫分離鎖來替換獨佔鎖
  • 鎖分離,如讀寫分離
  • 鎖粗化,即將所有的鎖操作整合為鎖的一次請求,減少對鎖的請求同步次數。

2 JDK內部的幾種”鎖“優化策略

  • 鎖偏向,一個執行緒獲得了鎖,那麼鎖就進入偏向模式。當這個執行緒再次請求鎖時無須再做任何同步操作。
  • 輕量級鎖,只是簡單地將物件頭部作為指標,指向持有鎖的執行緒堆疊的內部,來判斷一個執行緒是否持有物件鎖
  • 自旋鎖,讓當前執行緒做幾個空迴圈(即自旋),如果可以得到鎖則進入臨界區,否則掛起
  • 鎖清除,去掉不可能存在共享資源競爭的鎖

3 ThreadLocal,可以包裝非執行緒安全的物件

public class Test1 {

	static ThreadLocal<SimpleDateFormat> tl = new ThreadLocal<SimpleDateFormat>();
	public static class ParseDate implements Runnable{

		int i = 0;
		public ParseDate(int i){
			this.i = i;
		}
		@Override
		public void run() {
			// TODO Auto-generated method stub
			try{
				if(tl.get() == null){
					tl.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
				}
				Date t = tl.get().parse("2018-08-30 02:38:"+i%60);
				System.out.println(i+": "+t);
			}catch(ParseException e){
				e.printStackTrace();
			}
		}
		
	}
	public static void main(String[] args){
		ExecutorService es = Executors.newFixedThreadPool(10);
		for(int i=0;i<1000;i++){
			es.execute(new ParseDate(i));
		}
	}
}

4 ThreadLocal模式下會比多執行緒共享一個物件要快很多很多,如下例子快了30多倍

public class Test2 {

	public static final int GEN_COUNT =  10000000;
	public static final int THREAD_COUNT = 4;
	static ExecutorService es = Executors.newFixedThreadPool(THREAD_COUNT);
	public static Random rnd = new Random(123);
	
	public static ThreadLocal<Random> tRnd = new ThreadLocal<Random>(){
		
		protected Random initialValue(){
			return new Random(123);
		}
	};
	public static class RndTask implements Callable<Long>{

		private int mode = 0;
		public RndTask(int mode){
			this.mode = mode;
		}
		public Random getRandom(){
			if(mode == 0)
				return rnd;
			else if(mode == 1)
				return tRnd.get();
			else
				return null;
		}
		@Override
		public Long call() throws Exception {
			// TODO Auto-generated method stub
			long b = System.currentTimeMillis();
			for(long i=0;i<GEN_COUNT;i++)
				getRandom().nextInt();
			long e = System.currentTimeMillis();
			System.out.println(Thread.currentThread().getName()+" spend "+(e-b)+" ms");
			return e-b;
		}
		
	}
	public static void main(String[] args) throws InterruptedException,ExecutionException{
		Future<Long>[] futs = new Future[THREAD_COUNT];
		for(int i =0 ;i<THREAD_COUNT;i++){
			futs[i] = es.submit(new RndTask(0));
		}
		long totalTime = 0;
		for(int i=0;i<THREAD_COUNT;i++){
			totalTime += futs[i].get();
		}
		System.out.println("多執行緒訪問同一個Random例項:"+totalTime+" ms");
		for(int i=0;i<THREAD_COUNT;i++){
			futs[i] = es.submit(new RndTask(1));
		}
		totalTime = 0;
		for(int i=0;i<THREAD_COUNT;i++){
			totalTime += futs[i].get();
		}
		System.out.println("使用ThreadLocal包裝Random例項:"+totalTime+" ms");
		es.shutdown();		
	}
}

輸出結果

pool-1-thread-1 spend 4911 ms

pool-1-thread-4 spend 4959 ms

pool-1-thread-3 spend 4961 ms

pool-1-thread-2 spend 4962 ms

多執行緒訪問同一個Random例項:19793 ms

pool-1-thread-2 spend 130 ms

pool-1-thread-4 spend 131 ms

pool-1-thread-3 spend 133 ms

pool-1-thread-1 spend 134 ms

使用ThreadLocal包裝Random例項:528 ms

5 一種無鎖的策略:比較交換的技術(CAS  Compare And Swap) 來鑑別執行緒衝突,一旦檢測到衝突產生就重試當前操作直到沒有衝突為止。

CAS演算法的過程:共三個引數CAS(V , E , N)。V表示要更新的變數,E表示預期值,N表示新值。僅當V值等於E值時,才會將V值設為N。如果V值和E值不同則說明已經有其他執行緒做了更新,而當前執行緒什麼都不做。最後CAS返回當前V的真實值。

6 JDK併發包有一個atomic包,有個最常用的類AtomicInteger,它是可變且執行緒安全。還有AtomicLong,AtomicBoolean,AtomicReference。

public class AtomicIntegerDemo {

	static AtomicInteger i = new AtomicInteger();
	public static class AddThread implements Runnable{

		@Override
		public void run() {
			// TODO Auto-generated method stub
			for(int k=0;k<10000;k++)
				i.incrementAndGet();
		}
	}
	public static void main(String[] args) throws InterruptedException{
		Thread[] ts = new Thread[10];
		for(int k=0;k<10;k++){
			ts[k] = new Thread(new AddThread());
		}
		for(int k=0;k<10;k++)
			ts[k].start();
		for(int k=0;k<10;k++)
			ts[k].join();
		System.out.println(i);
	}
}

AtomicInteger.incrementAndGet()採用CAS操作給自己加1,同時返回當前值。

7 AtomicReference是指物件引用,它可以保證在修改物件引用時的執行緒安全性

8 原子陣列:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

9 可以讓普通變數也享受原子操作:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater

public class AtomicIntegerFieldUpdaterDemo {

	public static class Candidate{
		int id;
		volatile int score;
	}
	public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
	public static AtomicInteger allScore = new AtomicInteger(0);
	public static void main(String[] args) throws InterruptedException{
		final Candidate stu = new Candidate();
		Thread[] t = new Thread[10000];
		for(int i=0;i<10000;i++){
			t[i] = new Thread(){
				public void run(){
					if(Math.random() > 0.4){
						scoreUpdater.incrementAndGet(stu);
						allScore.incrementAndGet();
					}
				}
			};
			t[i].start();
		}
		for(int i=0;i<10000;i++)
			t[i].join();
		System.out.println("score="+stu.score);
		System.out.println("allScore = "+allScore);
	}
}

注意事項:

  • Updater只能修改可見氛圍的變數,如score為private則無法修改
  • 為了確保變數被正確的讀取,必須是volatile型別
  • 由於CAS操作會通過物件例項的偏移量直接進行賦值,所以它不支援static欄位

 

 

 

 

第5章 並行模式與演算法

1 單例模式是一種物件建立模式,用於產生一個物件的具體例項,確保系統中一個類只產生一個例項。

public class Singleton{
    private Singleton(){
        System.out.println("Singleton is create");
    }
    private static Singleton instance = new Singleton();
    public static Singleton getInstance(){
        return instance;
    }
}

注意:

  • 建構函式被定義為private,無法隨便建立這個例項。
  • instance物件必須是private並且static
  • getInstance()定義為靜態函式

上面有一個明顯不足,Singleton建構函式或者是Singleton例項在什麼時候建立是不受控制。如下是改善後的程式碼

public class Singleton {

	public static int k = 0;
	private Singleton(){
		System.out.println("Singleton is created");
	}
	private static class SingletonHolder{
		private static Singleton instance = new Singleton();
	}
	
	public static Singleton getInstance(){
		return SingletonHolder.instance;
	}
	
	public static void main(String[] args){
		//Singleton t = Singleton.getInstance();
		System.out.println(Singleton.k);
	}
}

2 不變模式天生就是多執行緒友好的,它的核心思想是一個物件一旦被建立,則它的內部狀態將永遠不會發生改變。不變模式的主要使用場景需要滿足以下2個條件:

  • 當物件建立後,其內部狀態和資料不再發生任何變化
  • 物件需要被分享,被多執行緒頻繁訪問

不變模式的實現很簡單,只需要注意以下4點:

  • 去除setter方法以及所有修改自身屬性的方法
  • 將所有屬性設定為私有,並用final標記確保其不可修改
  • 確保沒有子類可以過載修改它的行為
  • 有一個可以建立完整物件的建構函式
public final class Product {    // 確保無子類

	private final String no;
	private final String name;
	private final double price;
	
	public Product(String no,String name,double price){
		this.no = no;
		this.name = name;
		this.price = price;
	}
	public String getNo(){
		return no;
	}
	public String getName(){
		return name;
	}
	public double getPrice(){
		return price;
	}
}

不變模式應用很廣泛,最典型的就是java.lang.String類。此外所有的元資料類包裝類都是不變模式實現的。如java.lang.String,java.lang.Boolean,java.lang.Byte,java.lang.Character,java.lang.Double,java.lang.Float,java.lang.Integer,java.lang.Long,java.lang.Short

3 生產者-消費者模式是一個經典的多執行緒設計模式。生產者執行緒負責提交使用者請求,消費者執行緒則負責具體處理生產者提交的任務。生產者和消費者之間則通過共享記憶體緩衝區進行通訊。生產者-消費者模式的核心元件是共享記憶體緩衝區

PCData物件表示一個生產任務,或者相關任務資料。生產者物件和消費者物件均引用同一個BlockingQueue例項。生產者負責建立PCData物件,並將它加入BlockingQueue中,消費者則從BlockingQueue佇列中獲取PCData。

public final class PCData {

	private final int intData;
	public PCData(int d){
		intData = d;
	}
	public PCData(String d){
		intData = Integer.valueOf(d);
	}
	public int getData(){
		return intData;
	}
	public String toString(){
		return "data:"+intData;
	}
}


//生產者
public class Producer implements Runnable {

	private volatile boolean isRunning = true;
	private BlockingQueue<PCData> queue;
	private static AtomicInteger count = new AtomicInteger();
	private static final int SLEEPTIME = 1000;
	
	public Producer(BlockingQueue<PCData> queue){
		this.queue = queue;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		PCData data = null;
		Random r = new Random();
		System.out.println("start producer id="+Thread.currentThread().getId());
		try{
			while(isRunning){
				Thread.sleep(r.nextInt(SLEEPTIME));
				data = new PCData(count.incrementAndGet());
				System.out.println(data+" is put into queue");
				if(!queue.offer(data,2,TimeUnit.SECONDS)){
					System.out.println("failed to put data: " +data);
				}
			}
		}catch(InterruptedException e){
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}
	}
	public void stop(){
		isRunning = false;
	}

}


//消費者
public class Consumer implements Runnable {

	private BlockingQueue<PCData> queue;
	private static final int SLEEPTIME = 1000;
	
	public Consumer(BlockingQueue<PCData> queue){
		this.queue = queue;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("start Consumer id="+Thread.currentThread().getId());
		Random r = new Random();
		try{
			while(true){
				PCData data = queue.take();
				if(null != data){
					int re = data.getData() * data.getData();
					System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
					Thread.sleep(r.nextInt(SLEEPTIME));
				}
			}
		}catch(InterruptedException e){
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}
	}

}


public class Main {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
		Producer p1 = new Producer(queue);
		Producer p2 = new Producer(queue);
		Producer p3 = new Producer(queue);
		Consumer c1 = new Consumer(queue);
		Consumer c2 = new Consumer(queue);
		Consumer c3 = new Consumer(queue);
		ExecutorService es = Executors.newCachedThreadPool();
		es.execute(p1);
		es.execute(p2);
		es.execute(p3);
		es.execute(c1);
		es.execute(c2);
		es.execute(c3);
		Thread.sleep(10*1000);
		p1.stop();
		p2.stop();
		p3.stop();
		Thread.sleep(3000);
		es.shutdown();
	}
}

4 BlockingQueue是使用鎖和阻塞等待來實現執行緒間的同步。對於高併發場合,它的效能並不是最優越。而ConcurrentLinkedQueue是一個高效能的佇列,它使用了無鎖的CAS操作

5 Disruptor是個無鎖的快取框架,是由LMAX公司開發的一款高效的無鎖記憶體佇列。它使用無鎖的方式實現了一個環形佇列(RingBuffer),非常適合實現生產者和消費者模式。

public class PCData2 {

	private long value;
	public void set(long value){
		this.value = value;
	}
	public long get(){
		return value;
	}
}


// 工廠
public class PCDataFactory implements EventFactory<PCData2> {

	public PCData2 newInstance(){
		return new PCData2();
	}
}


//生產者
public class Producer2 {

	private final RingBuffer<PCData2> ringBuffer;
	public Producer2(RingBuffer<PCData2> ringBuffer){
		this.ringBuffer = ringBuffer;
	}
	public void pushData(ByteBuffer bb){
		long sequence = ringBuffer.next();
		try{
			PCData2 event = ringBuffer.get(sequence);
			event.set(bb.getLong(0));
		}finally{
			ringBuffer.publish(sequence);
		}
	}
}


//消費者
public class Consumer2 implements WorkHandler<PCData2>{

	@Override
	public void onEvent(PCData2 arg0) throws Exception {
		// TODO Auto-generated method stub
		System.out.println(Thread.currentThread().getId()+": Event: -- "
				+arg0.get()*arg0.get()+" --");
	}

}


//執行
public static void main(String[] args) throws Exception{
		Executor executor = Executors.newCachedThreadPool();
		PCDataFactory factory = new PCDataFactory();
		int bufferSize = 1024;
		Disruptor<PCData2> disruptor = new Disruptor<PCData2>(factory,bufferSize,executor,
				ProducerType.MULTI,new BlockingWaitStrategy());
		disruptor.handleEventsWithWorkerPool(
				new Consumer2(),
				new Consumer2(),
				new Consumer2(),
				new Consumer2());
		disruptor.start();
		RingBuffer<PCData2> ringBuffer = disruptor.getRingBuffer();
		Producer2 p = new Producer2(ringBuffer);
		ByteBuffer bb = ByteBuffer.allocate(8);
		for(long l = 0;true;l++){
			bb.putLong(0,l);
			p.pushData(bb);
			Thread.sleep(100);
			System.out.println("add data "+l);
		}
	}

6 Futur模式,也是一種多執行緒開發常見的模式,它的核心思想是非同步呼叫。當一個函式執行很慢,可以先讓呼叫者立即返回,讓函式在後臺慢慢處理請求。

7 併發流水線

public class Msg {
	public double i;
	public double j;
	public String orgStr = null;
}


public class Plus implements Runnable {
	public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();

	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true){
			try{
				Msg msg = bq.take();
				msg.j = msg.i+msg.j;
				Multiply.bq.add(msg);
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}
	}
}


public class Multiply implements Runnable {
	public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();

	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true){
			try{
				Msg msg = bq.take();
				msg.i = msg.i*msg.j;
				Div.bq.add(msg);
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}
	}
}


public class Div implements Runnable {
	public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();

	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true){
			try{
				Msg msg = bq.take();
				msg.i = msg.i/2;
				System.out.println(msg.orgStr+"="+msg.i);
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}
	}
}

public class PStreamMain {

	public static void main(String[] args){
		new Thread(new Plus()).start();
		new Thread(new Multiply()).start();
		new Thread(new Div()).start();
		
		for(int i=1;i<=10;i++){
			for(int j=1;j<=10;j++){
				Msg msg = new Msg();
				msg.i = i;
				msg.j = j;
				msg.orgStr = "(("+i+"+"+j+")*"+i+")/2";
				Plus.bq.add(msg);
			}
		}
	}
	
}

8 Java NIO是一套新的IO機制。