Java 並發編程實踐基礎 讀書筆記: 第三章 使用 JDK 並發包構建程序
一,JDK並發包實際上就是指java.util.concurrent包裏面的那些類和接口等
主要分為以下幾類: 1,原子量;2,並發集合;3,同步器;4,可重入鎖;5,線程池
二,原子量
原子變量主要有AtomicInteger,AtomicLong,AtomicBoolean等,
主要實現原理都是底層實現類CAS 即比較並交換,都有get,set,compareAndSet等方法,如++,--等也都是有自帶方法實現
這些都是線程安全的,保證了多線程訪問時候的可見性
1 import java.util.concurrent.atomic.AtomicLong; 23 /** 4 * StudySjms 5 * <p> 6 * Created by haozb on 2018/3/2. 7 */ 8 public class AtomicAccount { 9 10 AtomicLong account; 11 12 public AtomicAccount(long money) { 13 this.account = new AtomicLong(money); 14 } 15 16 public void withDraw(long money,int sleepTime){17 long oldValue = account.get(); 18 if(oldValue >= money){ 19 try { 20 Thread.sleep(sleepTime); 21 } catch (Exception e) { 22 23 } 24 if(account.compareAndSet(oldValue,oldValue-money)){ 25 System.out.println(Thread.currentThread().getName()+"扣錢成功"); 26 }else{ 27 System.out.println(Thread.currentThread().getName()+"扣錢失敗"); 28 } 29 }else{ 30 System.out.println("錢不夠了"); 31 } 32 } 33 34 public static void main(String[] args) { 35 final AtomicAccount aa = new AtomicAccount(100); 36 37 for (int i = 0; i < 2; i++) { 38 new Thread(new Runnable() { 39 @Override 40 public void run() { 41 aa.withDraw(100,1000); 42 } 43 }).start(); 44 } 45 } 46 }
上面這個方法就是利用原子量解決多線程中計數不安全的例子;
三,並發集合
這個包裏面有一個阻塞隊列的接口BlockingQueue,阻塞的概念就是滿了就不會再填充了,空了也不允許再取,
所有實現這個接口的隊列都是線程安全的。
主要有ArrayBlockingQueue:一個由數組支持的有界隊列
LinkedBlockingQueue:一個由鏈接節點支持的可選有界隊列
PriorityBlockingQueue:一個由優先級堆支持的無界優先級隊列
DelayQueue :一個由優先級堆支持的、基於時間的調度隊列
ConcurrentMap 接口,ConcurrentHashMap, 這個實現類的方法是原子的,源代碼裏面是采用lock住put那一塊代碼。
CopyOnWriteArrayList,CopyOnWriteArraySet采用copy-on-write 模式
package copyonwrite; import java.util.ArrayList; import java.util.Arrays; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; public class CopyOnWriteDemo { @SuppressWarnings("unchecked") public static void main(String args[]) { String[] ss = {"aa", "bb", "cc"}; List list1 = new CopyOnWriteArrayList(Arrays.asList(ss)); List list2 = new ArrayList(Arrays.asList(ss)); Iterator itor1 = list1.iterator(); Iterator itor2 = list2.iterator(); list1.add("New"); list2.add("New"); try { printAll(itor1); } catch (ConcurrentModificationException e) { System.err.println("Shouldn‘t get here"); } try { printAll(itor2); } catch (ConcurrentModificationException e) { System.err.println("Will gethere.ConcurrentModificationException occurs !"); } } @SuppressWarnings("unchecked") private static void printAll(Iterator itor) { while (itor.hasNext()) { System.out.println(itor.next()); } } }
運行結果如下:
Will get here.ConcurrentModificationException occurs!
aa
bb
cc
這個例子很好地說明了。
四,同步器
主要有CyclicBarrier:
1.它允許在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待
2.重要的屬性就是參與者個數,另外最要方法是 await()。當所有線程都調用了 await()後,就表示這些線程都可以繼續執行,否則就會等待
1 package synchronizer; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import java.util.concurrent.BrokenBarrierException; 6 import java.util.concurrent.CyclicBarrier; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 10 public class CyclicBarrierDemo { 11 /* 徒步需要的時間: Shenzhen, Guangzhou, Chongqing */ 12 private static int[] timeForWalk = { 5, 8, 15 }; 13 /* 自駕遊 */ 14 private static int[] timeForSelf = { 1, 3, 4 }; 15 /* 旅遊大巴 */ 16 private static int[] timeForBus = { 2, 4, 6 }; 17 18 static String nowTime() /* 時間格式化 */ 19 { 20 SimpleDateFormat sdf = new SimpleDateFormat( "HH:mm:ss" ); 21 return(sdf.format( new Date() ) + ": "); 22 } 23 24 25 static class Tour implements Runnable { 26 private int[] timeForUse; 27 private CyclicBarrier barrier; 28 private String tourName; 29 30 public Tour( CyclicBarrier barrier, String tourName, int[] timeForUse ) 31 { 32 this.timeForUse = timeForUse; 33 this.tourName = tourName; 34 this.barrier = barrier; 35 } 36 37 38 public void run() 39 { 40 try { 41 Thread.sleep( timeForUse[0] * 1000 ); 42 System.out.println( nowTime() + tourName + " ReachedShenenzh " ); 43 barrier.await(); /* 到達中轉站後等待其他旅行團 */ 44 Thread.sleep( timeForUse[1] * 1000 ); 45 System.out.println( nowTime() + tourName + " ReachedGuangzhou" ); 46 barrier.await(); /* 到達中轉站後等待其他旅行團 */ 47 Thread.sleep( timeForUse[2] * 1000 ); 48 System.out.println( nowTime() + tourName + " ReachedChonin" ); 49 barrier.await(); /* 到達中轉站後等待其他旅行團 */ 50 } catch ( InterruptedException e ) { 51 } catch ( BrokenBarrierException e ) { 52 } 53 } 54 } 55 56 public static void main( String[] args ) 57 { 58 /* 三個旅行團都到到達某一個站點後,執行下面的操作,表示都到齊了。 */ 59 Runnable runner = new Runnable() 60 { 61 @Override 62 public void run() 63 { 64 System.out.println( "we all are here." ); 65 } 66 }; 67 CyclicBarrier barrier = new CyclicBarrier( 3, runner ); 68 /* 使用線程池 */ 69 ExecutorService exec = Executors.newFixedThreadPool( 3 ); 70 exec.submit( new Tour( barrier, "WalkTour", timeForWalk ) ); 71 exec.submit( new Tour( barrier, "SelfTour", timeForSelf ) ); 72 exec.submit( new Tour( barrier, "BusTour", timeForBus ) ); 73 exec.shutdown(); 74 } 75 }
1 17:13:18: SelfTour Reached Shenzhen 2 17 3:1 Bus :1 9: Tour Reached Shenzhen 3 17 3:2 WalkTour Reached Shenzhen :1 2: 4 we all are here. 5 17 3:2 SelfTour Reached Guangzhou :1 5: 6 17 3:2 BusTour Reached Guangzhou :1 6: 7 17 3:3 WalkTour Reached Guangzhou :1 0: 8 we all are here. 9 17 3:3 SelfTour Reached Ch :1 4: ongqing 10 17:13:36: BusTour Reached Chongqing 11 17:13:45: WalkTour Reached Chongqing 12 we all are here.
五,Future(接口)和FutureTask(實現類)
註:FutureTask實現了Runnable,所以可以通過線程池和Thread執行
使用他們的好處是可以獲得線程的結果,和拋出異常,這種好處通過Callable的定義就知道了
1 public interface Callable<V> { 2 /** 3 * Computes a result, or throws an exception if unable to do so. 4 * 5 * @return computed result 6 * @throws Exception if unable to compute a result 7 */ 8 V call() throws Exception; 9 }
可以通過以下幾種方式調用
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Future; 4 import java.util.concurrent.FutureTask; 5 6 public class CallableTest { 7 8 public static void main(String[] args) { 9 // //創建線程池 10 // ExecutorService es = Executors.newSingleThreadExecutor(); 11 // //創建Callable對象任務 12 // CallableDemo calTask=new CallableDemo(); 13 // //提交任務並獲取執行結果 14 // Future<Integer> future =es.submit(calTask); 15 // //關閉線程池 16 // es.shutdown(); 17 18 //創建線程池 19 ExecutorService es = Executors.newSingleThreadExecutor(); 20 //創建Callable對象任務 21 CallableDemo calTask=new CallableDemo(); 22 //創建FutureTask 23 FutureTask<Integer> futureTask=new FutureTask<Integer>(calTask); 24 //執行任務 25 es.submit(futureTask); 26 //關閉線程池 27 es.shutdown(); 28 try { 29 Thread.sleep(2000); 30 System.out.println("主線程在執行其他任務"); 31 32 if(futureTask.get()!=null){ 33 //輸出獲取到的結果 34 System.out.println("futureTask.get()-->"+futureTask.get()); 35 }else{ 36 //輸出獲取到的結果 37 System.out.println("futureTask.get()未獲取到結果"); 38 } 39 40 } catch (Exception e) { 41 e.printStackTrace(); 42 } 43 System.out.println("主線程在執行完成"); 44 } 45 }
六,ReentrantLock顯示鎖 也叫可重入鎖; 必須在finally裏面釋放鎖
實現方式:
1 Lock lock=new ReentrantLock(); 2 lock.lock(); 3 try{ 4 // 更新對象狀態 5 } 6 finally{ 7 lock.unlock(); 8 }
new的時候,有個參數,true或者false 決定了是不是公平鎖,正常上不公平鎖的性能比較好!
線程之間的交互,有一個類叫Condition:Condition 的方法與 wait 、notify 和 notifyAll 方法類似,分別命名為 await 、 signal 和 signalAll
後面會詳細的介紹
七,ReadWriteLock
ReadWriteLock 維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。寫入鎖是獨占的
這個鎖比互斥鎖的性能上應該好些(讀的頻率大於寫的頻率)
1 import java.util.Calendar; 2 import java.util.Map; 3 import java.util.TreeMap; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantReadWriteLock; 6 7 /** 8 * StudySjms 9 * <p> 10 * Created by haozb on 2018/3/2. 11 */ 12 public class ReadWriteLockDemo { 13 private ReentrantReadWriteLock lock = null; 14 private Lock readLock = null;// 讀鎖 15 private Lock writeLock = null;// 寫鎖 16 public int key = 100; 17 public int index = 100; 18 public Map<Integer, String> dataMap = null;// 線程共享數據 19 20 public ReadWriteLockDemo() { 21 lock = new ReentrantReadWriteLock(true); 22 readLock = lock.readLock(); 23 writeLock = lock.writeLock(); 24 dataMap = new TreeMap<Integer, String>(); 25 } 26 27 public static void main(String[] args) { 28 ReadWriteLockDemo tester = new ReadWriteLockDemo(); 29 // 第一次獲取鎖 30 tester.writeLock.lock(); 31 System.out 32 .println(Thread.currentThread().getName() + " get writeLock."); 33 // 第二次獲取鎖,應為是可重入鎖 34 tester.writeLock.lock(); 35 System.out 36 .println(Thread.currentThread().getName() + " get writeLock."); 37 tester.readLock.lock(); 38 System.out.println(Thread.currentThread().getName() + " get readLock"); 39 tester.readLock.lock(); 40 System.out.println(Thread.currentThread().getName() + " get readLock"); 41 tester.readLock.unlock(); 42 tester.readLock.unlock(); 43 tester.writeLock.unlock(); 44 tester.writeLock.unlock(); 45 tester.test(); 46 } 47 48 public void test() { 49 // 讀線程比寫線程多 50 for (int i = 0; i < 10; i++) { 51 new Thread(new reader(this)).start(); 52 } 53 for (int i = 0; i < 3; i++) { 54 new Thread(new writer(this)).start(); 55 } 56 } 57 58 public void read() { 59 // 獲取鎖 60 readLock.lock(); 61 try { 62 if (dataMap.isEmpty()) { 63 Calendar now = Calendar.getInstance(); 64 System.out.println(now.getTime().getTime() + " R " 65 + Thread.currentThread().getName() 66 + " get key, but map is empty."); 67 } 68 String value = dataMap.get(index); 69 Calendar now = Calendar.getInstance(); 70 System.out.println(now.getTime().getTime() + " R " 71 + Thread.currentThread().getName() + " key = " + index 72 + " value = " + value + " map size = " + dataMap.size()); 73 if (value != null) { 74 index++; 75 } 76 } finally { 77 // 釋放鎖 78 readLock.unlock(); 79 } 80 try { 81 Thread.sleep(3000); 82 } catch (Exception e) { 83 84 } 85 } 86 87 public void write() { 88 writeLock.lock(); 89 try { 90 String value = "value" + key; 91 dataMap.put(new Integer(key), value); 92 Calendar now = Calendar.getInstance(); 93 System.out.println(now.getTime().getTime() + " W " 94 + Thread.currentThread().getName() + " key = " + key 95 + " value = " + value + " map size = " + dataMap.size()); 96 key++; 97 try { 98 Thread.sleep(500); 99 } catch (InterruptedException e) { 100 e.printStackTrace(); 101 } 102 } finally { 103 writeLock.unlock(); 104 } 105 } 106 } 107 108 class reader implements Runnable { 109 private ReadWriteLockDemo tester = null; 110 111 public reader(ReadWriteLockDemo tester) { 112 this.tester = tester; 113 } 114 115 @Override 116 public void run() { 117 Calendar now = Calendar.getInstance(); 118 System.out.println(now.getTime().getTime() + " R " 119 + Thread.currentThread().getName() + " started"); 120 for (int i = 0; i < 10; i++) { 121 tester.read(); 122 } 123 } 124 } 125 126 class writer implements Runnable { 127 private ReadWriteLockDemo tester = null; 128 129 public writer(ReadWriteLockDemo tester) { 130 this.tester = tester; 131 } 132 133 @Override 134 public void run() { 135 Calendar now = Calendar.getInstance(); 136 System.out.println(now.getTime().getTime() + " W " 137 + Thread.currentThread().getName() + " started"); 138 for (int i = 0; i < 10; i++) { 139 tester.write(); 140 } 141 } 142 }
Java 並發編程實踐基礎 讀書筆記: 第三章 使用 JDK 並發包構建程序