1. 程式人生 > >JAVA多執行緒的學習

JAVA多執行緒的學習

1、Thread.yield方法宣告把CPU讓給其他具有相同優先順序的執行緒去執行,不過這只是一個暗示,並沒有保障機制。

2、Executor

      執行器,管理Thread物件。

      語法demo:     

      ExecutorService exec=Executors.newCacheThreadPool();

      for(int i=0;i<5;i++){
        exec.execute(new XXX()); //XXX為實現Runnable介面的類
        exec.shutdown();
       }

    三種類型及其區別:

      CacheThreadPool:在程式執行過程中建立與所需數量相同的執行緒,在回收舊執行緒時停止建立新執行緒,是Executor首選。

       FixedThreadPool:一次性預先執行代價高昂的執行緒分配,可以限制執行緒的數量。

       SingleThreadExcutor:如果向其提交多個任務,那麼這些任務排隊,每個任務都在下個任務開始前結束,所有的任務使用相同的執行緒。

3、如何建立有返回值的執行緒

      實現Callable介面,重寫call()方法

      exec.submit(new XXX())會返回Future物件,用future.get()獲取值 這個值是泛型的,取決於實現介面的宣告,

      如:implements Callable<String>

      關於get:可以先呼叫Future的isDone()方法來查詢是否已經完成。任務完成時會具有一個結果,可以通過get來獲取。

      如果不適用isDone()直接get,get會阻塞知道結果準備就緒。

4、通過編寫定製的ThreadFactory可以定製由Excutor建立的執行緒屬性(是否是後臺,優先順序,名稱)

      比如:

class MyThreadFactory implements ThreadFactory{
    public Thread newThread(Runnable r){
        Thread t=new Thread(r);
        t.setDaemon(true); //設定為後臺執行緒
        return ..; 
    }
}

5、守護執行緒中派生的子類預設是守護執行緒,當所有非守護執行緒結束時,後臺執行緒終止,一旦main()退出,JVM會關閉所有的守護執行緒。

      當所有的非後臺執行緒結束時,程式就終止了。同時會殺死程序中所有的後臺執行緒。反過來說,只要有任何非後臺執行緒還在執行,程式就不會終止。

6、若在A執行緒中呼叫B.join方法,則A會被掛起,知道B結束。在呼叫時也可以帶上一個超時引數。對join方法的呼叫可以被打斷(通過在呼叫執行緒上呼叫interrupted()方法),這時被打斷的B需要用到try-catch字句。(在run方法中,catch InterruptedException)

7、執行緒丟擲的異常不能被正常try-catch到,可以用Executor解決這個問題:

      如4中所示自定義一個myfactory類,在該類的factory的newThread方法中t.setUncaughtExceptionHandler(new XXX) ;    

      XXX是實現了Thread.UncaughtExceptionHandler介面的類。

      然後ExecutorService exec=Executors.newCachedThreadPool(new myfactory);

8、synchronized和lock物件

      synchronized:如果某個任務處於一個對標記為synchronized的呼叫中,那麼在這個執行緒從該方法返回之前,其他所有需要呼叫類中任何標記為synchronized方法的執行緒都會被阻塞。不過如果是synchronized(obj)方法塊,只要obj是不同的物件,兩個方法不會因為另一個方法的同步而被阻塞。

      用synchronized時程式碼量更小,且不會出現忘記unlock這種情況,用Lock需要lock();try{} fianlly{unlock},避免在lock後代碼出現異常導致死鎖。顯式使用lock物件可以解決更特殊的問題,比如嘗試獲取鎖一段時間然後放棄、實現自己的排程演算法等等。且在加鎖釋放鎖方面有更細粒度的控制力。

9、volatile

      一個定義為volatile的變數是說這個變數可能會意想不到的被改變,這樣,編譯器就不會去假設這個變數的值了。精確地說就是,優化器在用到這個變數時必須每次都小心的重新讀取這個變數的值,而不是使用儲存在暫存器裡的備份。

      如果多個任務在同時訪問某個域,這個域就應該是volatile的,否則只能用同步來訪問。同步也會導致從主存中重新整理。因此一個域完全由synchronized方法或語句塊來保護,就不必為其設定volatile了。

      long和double的寫入可能不是原子的,因為long和double是64位的,在32位的機子上的讀寫操作會被當做兩個32位操作執行。如果使用volatile就會獲得原子性。此關鍵字還確保了應用中的可視性。如果把一個域宣告為volatile的,只要對這個域產生寫操作,所有的讀操作都可以看到這個修改。即使使用了快取。volatile域會被直接寫到主存中,而讀取操作發生在主存。如果域由synchronized方法或語句塊保護,不必設定為volatile。使用volatile而不是synchronized的唯一安全的情況是類中只有一個可變域。第一選擇應該是synchronized。

10、同步控制塊

        亦稱為臨界區,在方法內部:

         synchronized(syncObject){

         }

11、ThreadLocal

       當使用ThreadLocal維護變數時,ThreadLocal為每個使用該變數的執行緒提供獨立的變數副本,所以每個執行緒都可以獨立地改變自己的副本,而不會影響其它執行緒所對應的副本。

       從執行緒的角度看,目標變數就像是執行緒的本地變數,這也是類名中“Local”所要表達的意思。

12、執行緒的暫停、繼續、終止

       wait、notify、exec.shutdownNow()

       在Executor上呼叫shutdownNow(),將傳送一個interrupt()呼叫給他所有的執行緒。(相當於xx.interrupt())

       如果想中斷某個單一任務,使用Executor.submit()而不是execute()來啟動任務,返回一個Future<f> f,可以通過呼叫f.cancel(true)來中斷。

       可以中斷對sleep的呼叫,(或者任何要求丟擲InterruptedException的呼叫),不能中斷正在試圖獲取資源的synchronized鎖或者試圖執行I/O操作的執行緒(辦法是關閉任務在其上發生阻塞的底層資源)。

13、wait 和 notify

       sleep和notify不會釋放鎖,wait會釋放鎖

       兩種形式的wait:毫秒數作為引數或者不加引數

       把wait、notify放在Object類中是因為這些方法操作的鎖也是所有物件的一部分,所以可以把wait放進任何控制同步方法裡,不需要考慮這個類是繼承Thread還是Runnable介面。實際上,只能在同步控制方法或同步控制塊裡呼叫wait、notify。否則在執行的時候將得到illegalmonitorStateException異常。呼叫這兩個方法的任務在呼叫這些方法前必須用有物件的鎖。

       notify()和notifyAll()都是Object物件用於通知處在等待該物件的執行緒的方法。

       void notify():喚醒一個正在等待該物件的執行緒

       void notifyAll():喚醒所有正在等待該物件的執行緒。

       兩者的最大區別在於:

              notifyAll使所有原來在該物件上等待被notify的執行緒統統退出wait狀態,變成等待該物件上的鎖,一旦該物件被解鎖,他們就會去競爭。

              notify他只是選擇一個wait狀態的執行緒進行通知,並使它獲得該物件上的鎖,但不驚動其他同樣在在等待被該物件notify的執行緒們,當第一個執行緒執行完畢後釋放物件上的鎖,此時如果該物件沒有再次使用notify語句,即便該物件已經空閒,其他wait狀態等待的執行緒由於沒有得到該物件的通知,繼續處在wait狀態,直到這個物件發出一個notify或notifyAll,它們等待的是被notify或notifyAll,而不是鎖。

        可以用lock+condition來代替wait和notify,但是更復雜,只有在更加困難的多執行緒中才必須。

ReentrantLock

        java.util.concurrent.lock中的Lock框架是鎖定的一個抽象,它允許把鎖定的實現作為Java類,而不是作為程式語言的特性來實現。

這就為Lock多種實現留下了空間,各種實現可能有不同的排程演算法、效能特性或者鎖定語義。(synchronized是java底層語言,是定義的關鍵字,是語言層面的)

       ReentrantLock實現了Lock,它擁有與synchronized相同的併發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,他還提供了在激烈競爭下的更佳的效能。(換句話說,當許多執行緒都想訪問共享資源時,JVM可以花更少的時間來排程執行緒,把更多時間用線上程的執行上。)      

class printDemo{
    private Lock lock=new ReentrantLock();
    
    public void printStr(String str){
        lock.lock();//獲取鎖
        
        try{
            for(int i=0;i<str.length;i++){
                System.out.print(str.charAt(i)+" ");
            }
        }finally{
            lock.unlock();//釋放鎖
        }
    }
        
}

區別:

        需要注意的是,用synchronized修飾的方法或者語句塊在程式碼執行完之後鎖自動釋放,而Lock需要我們手動釋放鎖,所以為了保證鎖最終被釋放(發生異常情況),需要把互斥區放在try內,釋放鎖放在finally內!!

讀寫鎖ReadWriteLock

上例中展示的是和synchronized相同的功能,那麼Lock的優勢在哪兒呢?

        例如一個類對其內部共享資料data提供了get()和set方法,如果用synchronized,則程式碼如下:


package ThreadStudy;

public class SynchronizedDemo {
	public static void main(String[] args) {
		final syncData data = new syncData();

		// 寫入
		for (int i = 1; i <= 3; i++) {
			Thread t = new Thread(new Runnable() {
				public void run() {
					for (int j = 0; j < 5; j++) {
						data.set((int) (Math.random() * 30));
					}
				}
			});
			t.setName("Thread-write:" + i);
			t.start();
		}

		// 讀取
		for (int i = 1; i <= 3; i++) {
			Thread t = new Thread(new Runnable() {

				public void run() {
					for (int j = 0; j < 5; j++) {
						data.get();
					}

				}
			});
			t.setName("Thread-read:" + i);
			t.start();
		}
	}
}

class syncData {
	private int data;// 共享資料

	public synchronized void set(int data) {
		System.out.println(Thread.currentThread().getName() + "準備寫入資料。。。");
		try {
			Thread.sleep(20);
		} catch (Exception e) {
			e.printStackTrace();
		}
		this.data = data;
		System.out.println(Thread.currentThread().getName() + "寫入" + this.data);
	}

	public synchronized void get() {
		System.out.println(Thread.currentThread().getName() + "準備讀取資料");
		try {
			Thread.sleep(20);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName() + "讀取" + this.data);
	}
}

    其中讀取可以同時進行,不應該互斥

   下面是讀寫鎖ReadWriteLock實現:

package ThreadStudy;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
	public static void main(String[] args) {
		final rwlData data = new rwlData();

		// 寫入
		for (int i = 1; i <= 3; i++) {
			Thread t = new Thread(new Runnable() {
				public void run() {
					for (int j = 0; j < 5; j++) {
						data.set((int) (Math.random() * 30));
					}
				}
			});
			t.setName("Thread-write:" + i);
			t.start();
		}

		// 讀取
		for (int i = 1; i <= 3; i++) {
			Thread t = new Thread(new Runnable() {

				public void run() {
					for (int j = 0; j < 5; j++) {
						data.get();
					}

				}
			});
			t.setName("Thread-read:" + i);
			t.start();
		}
	}
}

class rwlData {
	private int data;// 共享資料
	private ReadWriteLock rwl = new ReentrantReadWriteLock();

	public synchronized void set(int data) {
		rwl.writeLock().lock();// 獲取寫鎖
		try {
			System.out.println(Thread.currentThread().getName() + "準備寫入資料。。。");
			try {
				Thread.sleep(20);
			} catch (Exception e) {
				e.printStackTrace();
			}
			this.data = data;
			System.out.println(Thread.currentThread().getName() + "寫入" + this.data);
		} finally {
			rwl.writeLock().unlock();// 釋放寫鎖
		}
	}

	public synchronized void get() {
		rwl.readLock().lock();//獲取讀鎖
		try {
			System.out.println(Thread.currentThread().getName() + "準備讀取資料");
			try {
				Thread.sleep(20);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName() + "讀取" + this.data);
		} finally {
			rwl.readLock().unlock();//釋放讀鎖
		}
	}
}

       與互斥鎖定相比,讀寫鎖定允許對共享資料進行跟高級別的併發放訪問。雖然一次只有一個執行緒(writer執行緒)可以修改共享資料,但在許多情況下,任何數量的執行緒可以同時讀取共享資料(reader執行緒)

       從理論上講,與互斥鎖定相比,使用讀寫鎖定所允許的併發性增強將帶來更大的效能提高。

       在實踐中,只有再多處理器上並且只在訪問模式使用與共享資料時,才能完全實現併發性增強。---例如,某個最初用資料填充並且之後不經常對其修改的collection,因為經常對其進行搜尋(比如搜尋某種目錄),所以這樣的collection是使用讀寫鎖的理想候選者。

執行緒間通訊Condition

condition可以代替傳統的執行緒間通訊,用await()替換wait(),用signal()替換 notify() ,用signalAll() 替換notifyAll()。

        ---因為wait()、notify()、notifyAll()在Object中是final的,不可重寫。

        傳統執行緒的通訊方式,Condition都可以實現。

        注意,Condition是被繫結到Lock上的,要建立一個Lock的condition必須用newConditon()方法。

        Condition的強大之處在於它可以為多個執行緒建立不同的Condition。

        看JDK文件中的一個例子:假定有一個繫結的緩衝區,它支援put和take方法。如果試圖在空的緩衝區上執行take操作,則在某一個項變得可用之前,執行緒將一直被阻塞;如果試圖在緩衝區上執行put操作,則在有空間變得可用之前,執行緒將一直阻塞。我們喜歡在單獨的等待set中儲存put執行緒和take執行緒,這樣就可以在緩衝區中的項或空間變得可用時利用最佳規劃,一次只通知一個執行緒。可以使用兩個Condition例項來做到這一點。

        ---其實就是java.util.concurrent.ArrayBlockingQueue的功能

package ThreadStudy;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockConditionDemo {
	public static void main(String[] args) {
		BoundedBuffer bb = new BoundedBuffer();
		// 寫入
		for (int i = 1; i <= 3; i++) {
			Thread t = new Thread(new Runnable() {
				public void run() {
					for (int j = 0; j < 60; j++) {
						try {
							bb.put(j);
							System.out.println(Thread.currentThread().getName()+"寫入:"+j);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			});
			t.setName("Thread-write:" + i);
			t.start();
		}
		// 讀取
		for (int i = 1; i <= 3; i++) {
			Thread t = new Thread(new Runnable() {

				public void run() {
					for (int j = 0; j < 60; j++) {
						try {
							System.out.println(Thread.currentThread().getName()+"讀取:"+bb.take());
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}

				}
			});
			t.setName("Thread-read:" + i);
			t.start();
		}
	}

}

class BoundedBuffer {
	final Lock lock = new ReentrantLock(); // 鎖物件
	final Condition notFull = lock.newCondition(); // 寫執行緒鎖
	final Condition notEmpty = lock.newCondition(); // 讀執行緒鎖

	final Object[] items = new Object[100];// 快取佇列
	int putptr; // 寫索引
	int takeptr; // 讀索引
	int count; // 佇列中資料數目

	// 寫
	public void put(Object x) throws InterruptedException {
		lock.lock(); // 鎖定
		try {
			// 如果佇列滿,則阻塞<寫執行緒>
			while (count == items.length) {
				notFull.await();
			}
			// 寫入佇列,並更新寫索引
			items[putptr] = x;
			if (++putptr == items.length)
				putptr = 0;
			++count;

			// 喚醒<讀執行緒>
			notEmpty.signal();
		} finally {
			lock.unlock();// 解除鎖定
		}
	}

	// 讀
	public Object take() throws InterruptedException {
		lock.lock(); // 鎖定
		try {
			// 如果佇列空,則阻塞<讀執行緒>
			while (count == 0) {
				notEmpty.await();
			}

			// 讀取佇列,並更新讀索引
			Object x = items[takeptr];
			if (++takeptr == items.length)
				takeptr = 0;
			--count;

			// 喚醒<寫執行緒>
			notFull.signal();
			return x;
		} finally {
			lock.unlock();// 解除鎖定
		}
	}
}

 優點:

         假設快取佇列裡面已經存滿,那麼阻塞的肯定是寫執行緒,喚醒的肯定是讀執行緒,相反,阻塞的肯定是讀執行緒,喚醒寫執行緒。

         那麼假設只有一個Condition會有什麼效果呢?緩衝佇列中已經存滿,這個lock不知道喚醒的是讀執行緒還是寫執行緒了,如果喚醒的是讀執行緒,那麼皆大歡喜,如果喚醒的是寫執行緒,那麼執行緒剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。

14、同步佇列

        首先我們需要了解同步佇列和等待佇列的概念。簡單的理解是同步佇列存放著競爭同步資源的執行緒的引用(不是存放執行緒),而等待佇列存放著待喚醒的執行緒的引用。

        同步佇列中存放著一個個節點,當執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態等資訊構造成為一個節點並將其加入同步佇列,首節點表示的獲取同步狀態成功的執行緒節點。 

        

15、管道

       兩個執行緒之間,一個擁有pepedwriter,一個擁有pipedreader(需要writer做引數)。

16、CountDownLatch和CyclicBarrier

        CountDownLatch:

                被用來同步一個或多個任務,強制他們等待由其他任務執行的一組操作完成。new CountDownLatch(int size),被等待的執行緒在執行完操作後latch.countdown(),等待執行緒呼叫latch.await();計數值為0時結束等待;

        CyclicBarrier:

                 字面意思迴環柵欄,通過它可以實現讓一組執行緒等待至某個狀態之後再全部同時執行。叫做迴環是因為當所有等待執行緒都被釋放以後,CyclicBarrier可以被重用。適用於:建立一組任務,並行執行,然後在下一個步驟之前等待。用於一組或幾組執行緒,比如一組執行緒需要在一個時間點上達成一致,例如同時開始一個工作。

        //當await的數量到達了設定的數量後,首先執行該Runnable物件。

        CyclicBarrier(int,Runnable):

        //通知barrier已完成執行緒

         await():

import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
 
 
/**  
 * 各省資料獨立,分庫存偖。為了提高計算效能,統計時採用每個省開一個執行緒先計算單省結果,最後彙總。  
 *   
 */  
public class CyclicBarrierDemo{   
   
    public static void main(String[] args) {   
        TotalService totalService = new TotalServiceImpl(); //計算彙總資訊  
        CyclicBarrier barrier = new CyclicBarrier(5,   
                new TotalTask(totalService));   //totaltask由最後一個進入的執行緒啟動
  
        // 實際系統是查出所有省編碼code的列表,然後迴圈,每個code生成一個執行緒。   
        new BillTask(new BillServiceImpl(), barrier, "北京").start();   
        new BillTask(new BillServiceImpl(), barrier, "上海").start();   
        new BillTask(new BillServiceImpl(), barrier, "廣西").start()	;   
        new BillTask(new BillServiceImpl(), barrier, "四川").start();   
        new BillTask(new BillServiceImpl(), barrier, "黑龍江").start();   
  
    }   
}   
  
/**  
 * 主任務:彙總任務  
 */  
class TotalTask implements Runnable {   
    private TotalService totalService;   
    TotalTask(){}
    TotalTask(TotalService totalService) {   
        this.totalService = totalService;   
    }   
  
    public void run() {   
        // 讀取記憶體中各省的資料彙總。   
        System.out.println("=======================================");   
        System.out.println("開始全國彙總");   
        totalService.count();   
    }   
}   
  
interface TotalService{
	public void count();
}
 
class TotalServiceImpl implements TotalService{
	public void count(){
		Integer totalSum=new Integer(0);
		ConcurrentHashMap<String, Integer> conmap=BillServiceImpl.conmap;
		for(Entry<String, Integer> entry:conmap.entrySet()){
			System.out.println(entry.getKey()+"="+entry.getValue());
			totalSum+=(Integer)entry.getValue();
		}
		System.out.println("全國資料總額為:"+totalSum.intValue());
	}
}
 
/**  
 * 子任務:計費任務  
 */  
class BillTask extends Thread {   
    // 計費服務   
    private BillService billService;   
    private CyclicBarrier barrier;  
    // 程式碼,按省程式碼分類,各省資料庫獨立。   
    private String code;   
    BillTask(){}
    BillTask(BillService billService, CyclicBarrier barrier, String code) {   
        this.billService = billService;   
        this.barrier = barrier;   
        this.code = code;   
    }   
  
    public void run() { 
    		billService.bill(code);  
        // 把bill方法結果存入記憶體,如ConcurrentHashMap,vector等,程式碼略   
//        System.out.println(code + "省已經計算完成,並通知彙總Service!");   
        try {   
            // 通知barrier已經完成   
            barrier.await();   
        } catch (InterruptedException e) {   
            e.printStackTrace();   
        } catch (Exception e) {   
            e.printStackTrace();   
        }  
    }   
  
}  
interface BillService{
	public void bill(String code);
}
class BillServiceImpl implements BillService{
	public static ConcurrentHashMap<String, Integer> conmap=new ConcurrentHashMap<String, Integer>();
	public void  bill(String code){
		conmap.put(code, new Random().nextInt(1000));
		System.out.println("開始計算--" + code + "省--資料!");
		System.err.println(code+"--資料--計算中---");
		System.err.println(code + "省已經計算完成,並通知彙總Service!");
	}
}

Semaphore

       Semaphore翻譯成字面意思是訊號量,Semaphore可以控制同時訪問的執行緒個數,通過acquire()獲取一個許可,如果沒有就等待,而release()釋放一個許可。

        假如一個工廠有5臺機器,但是有8個工人,一臺機器只能同時被一個人使用,只有使用完了,其他工人才能繼續使用。那麼我們通過Semaphore來實現:

package ThreadStudy;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
	
	public static void main(String[] args) {
		int n=8;
		Semaphore semaphore=new Semaphore(5);
		for(int i=1;i<=n;i++){
			new Worker(i, semaphore).start();
		}
	}
	
	static class Worker extends Thread{
		private int num;
		private Semaphore semaphore;
		public Worker(int num, Semaphore semaphore) {
			this.num = num;
			this.semaphore = semaphore;
		}
		
		public void run(){
			try{
				semaphore.acquire();
				System.out.println("工人"+this.num+"佔用一個機器在生產。。。。");
				Thread.sleep(2000);
				System.out.println("工人"+this.num+"釋放出機器。");
				semaphore.release();
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		
	}
}

下面對上面說的三個輔助類進行一個總結:

        (1)CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們的側重點不同:

                  CountDownLatch一般用於某個執行緒A等待若干個其他執行緒執行完成任務之後,他才執行;

                  而CyclicBarrier一般用於一組執行緒相互等待至某個狀態,然後這一組執行緒在同時執行;

                  另外,CountDownLatch是不能夠重用的,而CyclicBarrier可以。 

        (2)Semaphore其實和鎖有點類似,他一般用於控制對某族資源的訪問許可權。