1. 程式人生 > >黑馬程式設計師_基礎加強_Java執行緒通訊和執行緒併發庫

黑馬程式設計師_基礎加強_Java執行緒通訊和執行緒併發庫

 ------- android培訓java培訓、期待與您交流! ----------

java5的執行緒鎖技術

Lock&Condition實現執行緒同步通訊
Lock比傳統的synchronized方式更加面向物件,兩個執行緒執行的程式碼塊要實現同步互斥,必須持有同一個Lock物件。
ReadWriteLock,多個讀鎖不互斥,讀鎖與寫鎖互斥。如果需要多執行緒同時讀,但不能同時寫,加讀鎖;如果程式碼修改資料,為改程式碼加寫鎖,寫鎖是執行緒獨佔的。
在等待 Condition 時,允許發生“虛假喚醒”,Condition 應該總是在一個迴圈中被等待,並測試正被等待的狀態。
Condition condition = lock.newCondition();
使用讀寫鎖的快取功能
[java] view plaincopyprint?
  1. class CachedData {
  2. Object data;
  3. volatile boolean cacheValid;
  4. ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5. void processCachedData() {
  6. rwl.readLock().lock();
  7. if (!cacheValid) {//如果快取中沒有data
  8. // 在使用寫鎖前必須釋放讀鎖
  9. rwl.readLock().unlock();
  10. rwl.writeLock().lock();
  11. // 獲取寫鎖後在檢查
  12. if (!cacheValid) {
  13. data = ...//寫入data
  14. cacheValid = true;
  15. }
  16. // 寫入資料後,上讀鎖,在釋放寫鎖
  17. rwl.readLock().lock();
  18. rwl.writeLock().unlock();
  19. }
  20. use(data);
  21. rwl.readLock().unlock();
  22. }
  23. }
class CachedData {
   Object data;
   volatile boolean cacheValid;
   ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();


   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {//如果快取中沒有data
        // 在使用寫鎖前必須釋放讀鎖
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        //  獲取寫鎖後在檢查 
        if (!cacheValid) {
          data = ...//寫入data
          cacheValid = true;
        }
        // 寫入資料後,上讀鎖,在釋放寫鎖
        rwl.readLock().lock();
        rwl.writeLock().unlock(); 
     }
     use(data);
     rwl.readLock().unlock();
   }
 }

Semaphore 計數訊號燈

限制可以訪問某些資源的執行緒數目。可進入同一段程式碼的執行緒數目。
Semaphore(int permits) permits允許的併發執行緒數.
acquire() 從此訊號量獲取一個許可.
void release() 釋放一個許可。


單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”。
[java] view plaincopyprint?
  1. public class SemaphoreTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newCachedThreadPool();
  4. final Semaphore sp = new Semaphore(3);//建立一個可以允許3個執行緒併發訪問的訊號燈
  5. for(int i=0;i<10;i++){
  6. Runnable runnable = new Runnable(){
  7. public void run(){
  8. try {
  9. sp.acquire();
  10. } catch (InterruptedException e1) {
  11. e1.printStackTrace();
  12. }
  13. System.out.println("執行緒" + Thread.currentThread().getName() +
  14. "進入,當前已有" + (3-sp.availablePermits()) + "個併發");
  15. try {
  16. Thread.sleep((long)(Math.random()*10000));
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. System.out.println("執行緒" + Thread.currentThread().getName() +
  21. "即將離開");
  22. sp.release();
  23. //下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元
  24. System.out.println("執行緒" + Thread.currentThread().getName() +
  25. "已離開,當前已有" + (3-sp.availablePermits()) + "個併發");
  26. }
  27. };
  28. service.execute(runnable); //將任務交給執行緒池
  29. }
  30. }
  31. }
public class SemaphoreTest {
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final  Semaphore sp = new Semaphore(3);//建立一個可以允許3個執行緒併發訪問的訊號燈
		for(int i=0;i<10;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						sp.acquire();
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
					System.out.println("執行緒" + Thread.currentThread().getName() + 
							"進入,當前已有" + (3-sp.availablePermits()) + "個併發");
					try {
						Thread.sleep((long)(Math.random()*10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("執行緒" + Thread.currentThread().getName() + 
							"即將離開");					
					sp.release();
					//下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元
					System.out.println("執行緒" + Thread.currentThread().getName() + 
							"已離開,當前已有" + (3-sp.availablePermits()) + "個併發");					
				}
			};
			service.execute(runnable);	//將任務交給執行緒池		
		}
	}
}

CyclicBarrier

一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點。
[java] view plaincopyprint?
  1. public class CyclicBarrierTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newCachedThreadPool();//建立一個執行緒池
  4. final CyclicBarrier cb = new CyclicBarrier(3);//建立一個迴圈barrier,執行緒數目為3
  5. for(int i=0;i<3;i++){//新建3個執行緒,交給執行緒池執行
  6. Runnable runnable = new Runnable(){
  7. public void run(){
  8. try {
  9. Thread.sleep((long)(Math.random()*10000));
  10. System.out.println("執行緒" + Thread.currentThread().getName() +
  11. "即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
  12. cb.await();//等待其他執行緒
  13. Thread.sleep((long)(Math.random()*10000));
  14. System.out.println("執行緒" + Thread.currentThread().getName() +
  15. "即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
  16. cb.await();
  17. Thread.sleep((long)(Math.random()*10000));
  18. System.out.println("執行緒" + Thread.currentThread().getName() +
  19. "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
  20. cb.await();
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. };
  26. service.execute(runnable);
  27. }
  28. service.shutdown();
  29. }
  30. }
public class CyclicBarrierTest {


	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();//建立一個執行緒池
		final  CyclicBarrier cb = new CyclicBarrier(3);//建立一個迴圈barrier,執行緒數目為3
		for(int i=0;i<3;i++){//新建3個執行緒,交給執行緒池執行
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("執行緒" + Thread.currentThread().getName() + 
								"即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));						
						cb.await();//等待其他執行緒
						
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("執行緒" + Thread.currentThread().getName() + 
								"即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
						cb.await();	
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("執行緒" + Thread.currentThread().getName() + 
								"即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));						
						cb.await();						
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			};
			service.execute(runnable);
		}
		service.shutdown();
	}
}

CountDownLatch

CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。
呼叫countDown方法就將計數器減1,當計數到達0時,則所有等待者或單個等待者開始執行。
[java] view plaincopyprint?
  1. public class CountdownLatchTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newCachedThreadPool();
  4. final CountDownLatch cdOrder = new CountDownLatch(1);
  5. final CountDownLatch cdAnswer = new CountDownLatch(3);
  6. for(int i=0;i<3;i++){
  7. Runnable runnable = new Runnable(){//建立3個執行緒
  8. public void run(){
  9. try {
  10. System.out.println("執行緒" + Thread.currentThread().getName() +
  11. "正準備接受命令");
  12. cdOrder.await();//等待主執行緒控制cdOrder開始任務
  13. System.out.println("執行緒" + Thread.currentThread().getName() +
  14. "已接受命令");
  15. Thread.sleep((long)(Math.random()*10000));
  16. System.out.println("執行緒" + Thread.currentThread().getName() +
  17. "迴應命令處理結果");
  18. cdAnswer.countDown();//任務完成通知主執行緒
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. };
  24. service.execute(runnable);
  25. }
  26. try {
  27. Thread.sleep((long)(Math.random()*10000));
  28. System.out.println("執行緒" + Thread.currentThread().getName() +
  29. "即將釋出命令");
  30. cdOrder.countDown();//將count歸0,開始執行任務
  31. System.out.println("執行緒" + Thread.currentThread().getName() +
  32. "已傳送命令,正在等待結果");
  33. cdAnswer.await();//等待子執行緒完成
  34. System.out.println("執行緒" + Thread.currentThread().getName() +
  35. "已收到所有響應結果");
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. service.shutdown();
  40. }
  41. }
public class CountdownLatchTest {


	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final CountDownLatch cdOrder = new CountDownLatch(1);
		final CountDownLatch cdAnswer = new CountDownLatch(3);		
		for(int i=0;i<3;i++){
			Runnable runnable = new Runnable(){//建立3個執行緒
					public void run(){
					try {
						System.out.println("執行緒" + Thread.currentThread().getName() + 
								"正準備接受命令");						
						cdOrder.await();//等待主執行緒控制cdOrder開始任務
						System.out.println("執行緒" + Thread.currentThread().getName() + 
						"已接受命令");								
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("執行緒" + Thread.currentThread().getName() + 
								"迴應命令處理結果");						
						cdAnswer.countDown();//任務完成通知主執行緒						
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			};
			service.execute(runnable);
		}		
		try {
			Thread.sleep((long)(Math.random()*10000));
		
			System.out.println("執行緒" + Thread.currentThread().getName() + 
					"即將釋出命令");						
			cdOrder.countDown();//將count歸0,開始執行任務
			System.out.println("執行緒" + Thread.currentThread().getName() + 
			"已傳送命令,正在等待結果");	
			cdAnswer.await();//等待子執行緒完成
			System.out.println("執行緒" + Thread.currentThread().getName() + 
			"已收到所有響應結果");	
		} catch (Exception e) {
			e.printStackTrace();
		}				
		service.shutdown();
	}
}

Exchanger

實現持有同一Exchanger物件的兩個執行緒間的資料交換。
V exchange(V x) 方法交換資料。

可阻塞佇列

public class ArrayBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
按 FIFO(先進先出)原則對元素進行排序。
生產者使用put方法放入資料,消費者使用take方法取出資料。
新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。
size() 返回此佇列中元素的數量。
可以由兩個只有一個元素的同步佇列物件實現執行緒通訊功能。


空中網題目:
Test類中的程式碼在不斷地產生資料,然後交給TestDo.doSome()方法去處理,就好像生產者在不斷地產生資料,消費者在不斷消費資料。
程式有10個執行緒來消費生成者產生的資料,這些消費者都呼叫TestDo.doSome()方法去進行處理,故每個消費者都需要一秒才能處理完,程式應保證這些消費者執行緒依次有序地消費資料,只有上一個消費者消費完後,下一個消費者才能消費資料,下一個消費者是誰都可以,但要保證這些消費者執行緒拿到的資料是有順序的。
[java] view plaincopyprint?
  1. public class Test {
  2. public static void main(String[] args) {
  3. final Semaphore semaphore = new Semaphore(1);
  4. final SynchronousQueue<String> queue = new SynchronousQueue<String>();
  5. for(int i=0;i<10;i++){
  6. new Thread(new Runnable(){
  7. @Override
  8. public void run() {
  9. try {
  10. semaphore.acquire();
  11. String input = queue.take();//保證按順序取出資料
  12. String output = TestDo.doSome(input);
  13. System.out.println(Thread.currentThread().getName()+ ":" + output);
  14. semaphore.release();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).start();
  20. }
  21. System.out.println("begin:"+(System.currentTimeMillis()/1000));
  22. for(int i=0;i<10;i++){ //這行不能改動
  23. String input = i+""; //這行不能改動
  24. try {
  25. queue.put(input);//主執行緒的迴圈產生資料,並將資料放入同步佇列中,由子執行緒對佇列進行操作
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. }
  32. //不能改動此TestDo類
  33. class TestDo {
  34. public static String doSome(String input){
  35. try {
  36. Thread.sleep(1000);
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. String output = input + ":"+ (System.currentTimeMillis() / 1000);
  41. return output;
  42. }
  43. }
public class Test {


	public static void main(String[] args) {
		final Semaphore semaphore = new Semaphore(1);
		final SynchronousQueue<String> queue = new SynchronousQueue<String>();
		for(int i=0;i<10;i++){
			new Thread(new Runnable(){
				@Override
				public void run() {	
					try {
						semaphore.acquire();
						String input = queue.take();//保證按順序取出資料
						String output = TestDo.doSome(input);
						System.out.println(Thread.currentThread().getName()+ ":" + output);
						semaphore.release();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}	
				}
			}).start();
		}
		
		System.out.println("begin:"+(System.currentTimeMillis()/1000));
		for(int i=0;i<10;i++){  //這行不能改動
			String input = i+"";  //這行不能改動
			try {
				queue.put(input);//主執行緒的迴圈產生資料,並將資料放入同步佇列中,由子執行緒對佇列進行操作
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}


//不能改動此TestDo類
class TestDo {
	public static String doSome(String input){
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		String output = input + ":"+ (System.currentTimeMillis() / 1000);
		return output;
	}
}

同步集合

傳統方式下的Collection在迭代集合時,不允許對集合進行修改。
同步集合類:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。當期望許多執行緒訪問一個給定 collection 時,ConcurrentHashMap 通常優於同步的 HashMap,ConcurrentSkipListMap 通常優於同步的 TreeMap。當期望的讀數和遍歷遠遠大於列表的更新數時,CopyOnWriteArrayList 優於同步的 ArrayList。


空中網面試題:
如果有幾個執行緒呼叫TestDo.doSome(key, value)方法時,傳遞進去的key相等(equals比較為true),則這幾個執行緒應互斥排隊輸出結果,即當有兩個執行緒的key都是"1"時,它們中的一個要比另外其他執行緒晚1秒輸出結果,如下所示:
4:4:1258199615
1:1:1258199615
3:3:1258199615
1:2:1258199616
[java] view plaincopyprint?
  1. import java.util.Iterator;
  2. import java.util.concurrent.CopyOnWriteArrayList;
  3. //不能改動此Test類
  4. public class Test extends Thread{
  5. private TestDo testDo;
  6. private String key;
  7. private String value;
  8. public Test(String key,String key2,String value){
  9. this.testDo = TestDo.getInstance();
  10. /*常量"1"和"1"是同一個物件,下面這行程式碼就是要用"1"+""的方式產生新的物件,
  11. 以實現內容沒有改變,仍然相等(都還為"1"),但物件卻不再是同一個的效果*/
  12. this.key = key+key2;
  13. this.value = value;
  14. }
  15. public static void main(String[] args) throws InterruptedException{
  16. Test a = new Test("1","","1");
  17. Test b = new Test("1","","2");
  18. Test c = new Test("3","","3");
  19. Test d = new Test("4","","4");
  20. System.out.println("begin:"+(System.currentTimeMillis()/1000));
  21. a.start();
  22. b.start();
  23. c.start();
  24. d.start();
  25. }
  26. public void run(){
  27. testDo.doSome(key, value);
  28. }
  29. }
  30. class TestDo {
  31. private TestDo() {}
  32. private static TestDo _instance = new TestDo();
  33. public static TestDo getInstance() {
  34. return _instance;
  35. }
  36. //因為在執行doSome時,需要對key進行判斷和新增,需要使用同步集合
  37. private CopyOnWriteArrayList keys = new CopyOnWriteArrayList();
  38. public void doSome(Object key, String value) {
  39. Object o = key;//以key做同步互斥鎖物件
  40. if(!keys.contains(o)){
  41. keys.add(o);
  42. }else{
  43. for(Iterator iter=keys.iterator();iter.hasNext();){
  44. try {
  45. Thread.sleep(20);//測試程式碼,讓對集合操作的動作多執行一會
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. Object oo = iter.next();
  50. if(oo.equals(o)){
  51. o = oo;
  52. break;
  53. }
  54. }
  55. }
  56. synchronized(o)
  57. // 以大括號內的是需要區域性同步的程式碼,不能改動!
  58. {
  59. try {
  60. Thread.sleep(1000);
  61. System.out.println(key+":"+value + ":"
  62. + (System.currentTimeMillis() / 1000));
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }
  68. }
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;


//不能改動此Test類	
public class Test extends Thread{
	
	private TestDo testDo;
	private String key;
	private String value;
	
	public Test(String key,String key2,String value){
		this.testDo = TestDo.getInstance();
		/*常量"1"和"1"是同一個物件,下面這行程式碼就是要用"1"+""的方式產生新的物件,
		以實現內容沒有改變,仍然相等(都還為"1"),但物件卻不再是同一個的效果*/
		this.key = key+key2; 
		this.value = value;
	}


	public static void main(String[] args) throws InterruptedException{
		Test a = new Test("1","","1");
		Test b = new Test("1","","2");
		Test c = new Test("3","","3");
		Test d = new Test("4","","4");
		System.out.println("begin:"+(System.currentTimeMillis()/1000));
		a.start();
		b.start();
		c.start();
		d.start();


	}
	
	public void run(){
		testDo.doSome(key, value);
	}
}


class TestDo {


	private TestDo() {}
	private static TestDo _instance = new TestDo();	
	public static TestDo getInstance() {
		return _instance;
	}
	//因為在執行doSome時,需要對key進行判斷和新增,需要使用同步集合
	private CopyOnWriteArrayList keys = new CopyOnWriteArrayList();
	public void doSome(Object key, String value) {
		Object o = key;//以key做同步互斥鎖物件
		if(!keys.contains(o)){
			keys.add(o);
		}else{
			for(Iterator iter=keys.iterator();iter.hasNext();){
				try {
					Thread.sleep(20);//測試程式碼,讓對集合操作的動作多執行一會
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				Object oo = iter.next();
				if(oo.equals(o)){
					o = oo;
					break;
				}
			}
		}
		synchronized(o)
		// 以大括號內的是需要區域性同步的程式碼,不能改動!
		{
			try {
				Thread.sleep(1000);
				System.out.println(key+":"+value + ":"
						+ (System.currentTimeMillis() / 1000));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

  ------- android培訓java培訓、期待與您交流! ----------