1. 程式人生 > >Java 並發編程實踐基礎 讀書筆記: 第三章 使用 JDK 並發包構建程序

Java 並發編程實踐基礎 讀書筆記: 第三章 使用 JDK 並發包構建程序

mod 獲取鎖 -o key 讀取 拋出異常 編程實踐 arraylist ask

一,JDK並發包實際上就是指java.util.concurrent包裏面的那些類和接口等

  主要分為以下幾類: 1,原子量;2,並發集合;3,同步器;4,可重入鎖;5,線程池

二,原子量

  原子變量主要有AtomicInteger,AtomicLong,AtomicBoolean等,

  主要實現原理都是底層實現類CAS 即比較並交換,都有get,set,compareAndSet等方法,如++,--等也都是有自帶方法實現

  這些都是線程安全的,保證了多線程訪問時候的可見性

  

 1 import java.util.concurrent.atomic.AtomicLong;
 2
3 /** 4 * StudySjms 5 * <p> 6 * Created by haozb on 2018/3/2. 7 */ 8 public class AtomicAccount { 9 10 AtomicLong account; 11 12 public AtomicAccount(long money) { 13 this.account = new AtomicLong(money); 14 } 15 16 public void withDraw(long money,int sleepTime){
17 long oldValue = account.get(); 18 if(oldValue >= money){ 19 try { 20 Thread.sleep(sleepTime); 21 } catch (Exception e) { 22 23 } 24 if(account.compareAndSet(oldValue,oldValue-money)){ 25 System.out.println(Thread.currentThread().getName()+"
扣錢成功"); 26 }else{ 27 System.out.println(Thread.currentThread().getName()+"扣錢失敗"); 28 } 29 }else{ 30 System.out.println("錢不夠了"); 31 } 32 } 33 34 public static void main(String[] args) { 35 final AtomicAccount aa = new AtomicAccount(100); 36 37 for (int i = 0; i < 2; i++) { 38 new Thread(new Runnable() { 39 @Override 40 public void run() { 41 aa.withDraw(100,1000); 42 } 43 }).start(); 44 } 45 } 46 }

上面這個方法就是利用原子量解決多線程中計數不安全的例子;

三,並發集合

這個包裏面有一個阻塞隊列的接口BlockingQueue,阻塞的概念就是滿了就不會再填充了,空了也不允許再取,

所有實現這個接口的隊列都是線程安全的。

主要有ArrayBlockingQueue:一個由數組支持的有界隊列

   LinkedBlockingQueue:一個由鏈接節點支持的可選有界隊列

   PriorityBlockingQueue:一個由優先級堆支持的無界優先級隊列

   DelayQueue :一個由優先級堆支持的、基於時間的調度隊列

ConcurrentMap 接口,ConcurrentHashMap, 這個實現類的方法是原子的,源代碼裏面是采用lock住put那一塊代碼。

CopyOnWriteArrayList,CopyOnWriteArraySet采用copy-on-write 模式

package copyonwrite;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteDemo {
    @SuppressWarnings("unchecked")
    public static void main(String args[]) {
        String[] ss = {"aa", "bb", "cc"};
        List list1 = new CopyOnWriteArrayList(Arrays.asList(ss));
        List list2 = new ArrayList(Arrays.asList(ss));
        Iterator itor1 = list1.iterator();
        Iterator itor2 = list2.iterator();
        list1.add("New");
        list2.add("New");
        try {
            printAll(itor1);
        } catch (ConcurrentModificationException e) {
            System.err.println("Shouldn‘t get here");
        }
        try {
            printAll(itor2);
        } catch (ConcurrentModificationException e) {
            System.err.println("Will gethere.ConcurrentModificationException occurs !");
        }
    }

    @SuppressWarnings("unchecked")
    private static void printAll(Iterator itor) {
        while (itor.hasNext()) {
            System.out.println(itor.next());
        }
    }
}

運行結果如下:
Will get here.ConcurrentModificationException occurs!
aa
bb
cc

這個例子很好地說明了。

四,同步器

  主要有CyclicBarrier:

  1.它允許在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待

  2.重要的屬性就是參與者個數,另外最要方法是 await()。當所有線程都調用了 await()後,就表示這些線程都可以繼續執行,否則就會等待

 1 package synchronizer;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.BrokenBarrierException;
 6 import java.util.concurrent.CyclicBarrier;
 7 import java.util.concurrent.ExecutorService;
 8 import java.util.concurrent.Executors;
 9 
10 public class CyclicBarrierDemo {
11     /* 徒步需要的時間: Shenzhen, Guangzhou, Chongqing */
12     private static int[] timeForWalk = { 5, 8, 15 };
13     /* 自駕遊 */
14     private static int[] timeForSelf = { 1, 3, 4 };
15     /* 旅遊大巴 */
16     private static int[] timeForBus = { 2, 4, 6 };
17 
18     static String nowTime() /* 時間格式化 */
19     {
20         SimpleDateFormat sdf = new SimpleDateFormat( "HH:mm:ss" );
21         return(sdf.format( new Date() ) + ": ");
22     }
23 
24 
25     static class Tour implements Runnable {
26         private int[]        timeForUse;
27         private CyclicBarrier    barrier;
28         private String        tourName;
29 
30         public Tour( CyclicBarrier barrier, String tourName, int[] timeForUse )
31         {
32             this.timeForUse = timeForUse;
33             this.tourName    = tourName;
34             this.barrier    = barrier;
35         }
36 
37 
38         public void run()
39         {
40             try {
41                 Thread.sleep( timeForUse[0] * 1000 );
42                 System.out.println( nowTime() + tourName + "  ReachedShenenzh " );
43                 barrier.await();        /* 到達中轉站後等待其他旅行團 */
44                 Thread.sleep( timeForUse[1] * 1000 );
45                 System.out.println( nowTime() + tourName + "  ReachedGuangzhou" );
46                 barrier.await();        /* 到達中轉站後等待其他旅行團 */
47                 Thread.sleep( timeForUse[2] * 1000 );
48                 System.out.println( nowTime() + tourName + "  ReachedChonin" );
49                 barrier.await();        /* 到達中轉站後等待其他旅行團 */
50             } catch ( InterruptedException e ) {
51             } catch ( BrokenBarrierException e ) {
52             }
53         }
54     }
55 
56     public static void main( String[] args )
57     {
58         /* 三個旅行團都到到達某一個站點後,執行下面的操作,表示都到齊了。 */
59         Runnable runner = new Runnable()
60         {
61             @Override
62             public void run()
63             {
64                 System.out.println( "we all are here." );
65             }
66         };
67         CyclicBarrier barrier = new CyclicBarrier( 3, runner );
68 /* 使用線程池 */
69         ExecutorService exec = Executors.newFixedThreadPool( 3 );
70         exec.submit( new Tour( barrier, "WalkTour", timeForWalk ) );
71         exec.submit( new Tour( barrier, "SelfTour", timeForSelf ) );
72         exec.submit( new Tour( barrier, "BusTour", timeForBus ) );
73         exec.shutdown();
74     }
75 }
 1 17:13:18: SelfTour Reached Shenzhen
 2 17 3:1 Bus :1 9: Tour Reached Shenzhen
 3 17 3:2 WalkTour Reached Shenzhen  :1 2:
 4 we all are here.
 5 17 3:2 SelfTour Reached Guangzhou  :1 5:
 6 17 3:2 BusTour Reached Guangzhou  :1 6:
 7 17 3:3 WalkTour Reached Guangzhou  :1 0:
 8 we all are here.
 9 17 3:3 SelfTour Reached Ch :1 4:  ongqing
10 17:13:36: BusTour Reached Chongqing
11 17:13:45: WalkTour Reached Chongqing
12 we all are here.

五,Future(接口)和FutureTask(實現類)

  註:FutureTask實現了Runnable,所以可以通過線程池和Thread執行

  使用他們的好處是可以獲得線程的結果,和拋出異常,這種好處通過Callable的定義就知道了

1 public interface Callable<V> {
2     /**
3      * Computes a result, or throws an exception if unable to do so.
4      *
5      * @return computed result
6      * @throws Exception if unable to compute a result
7      */
8     V call() throws Exception;
9 }

可以通過以下幾種方式調用

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 import java.util.concurrent.Future;
 4 import java.util.concurrent.FutureTask;
 5 
 6 public class CallableTest {
 7 
 8     public static void main(String[] args) {
 9 //      //創建線程池  
10 //      ExecutorService es = Executors.newSingleThreadExecutor();  
11 //      //創建Callable對象任務  
12 //      CallableDemo calTask=new CallableDemo();  
13 //      //提交任務並獲取執行結果  
14 //      Future<Integer> future =es.submit(calTask);  
15 //      //關閉線程池  
16 //      es.shutdown();  
17 
18         //創建線程池  
19         ExecutorService es = Executors.newSingleThreadExecutor();
20         //創建Callable對象任務  
21         CallableDemo calTask=new CallableDemo();
22         //創建FutureTask  
23         FutureTask<Integer> futureTask=new FutureTask<Integer>(calTask);
24         //執行任務  
25         es.submit(futureTask);
26         //關閉線程池  
27         es.shutdown();
28         try {
29             Thread.sleep(2000);
30             System.out.println("主線程在執行其他任務");
31 
32             if(futureTask.get()!=null){
33                 //輸出獲取到的結果  
34                 System.out.println("futureTask.get()-->"+futureTask.get());
35             }else{
36                 //輸出獲取到的結果  
37                 System.out.println("futureTask.get()未獲取到結果");
38             }
39 
40         } catch (Exception e) {
41             e.printStackTrace();
42         }
43         System.out.println("主線程在執行完成");
44     }
45 }  

六,ReentrantLock顯示鎖 也叫可重入鎖; 必須在finally裏面釋放鎖

  實現方式:

1 Lock lock=new ReentrantLock();
2 lock.lock();
3 try{
4 // 更新對象狀態
5 }
6 finally{
7 lock.unlock();
8 }

new的時候,有個參數,true或者false 決定了是不是公平鎖,正常上不公平鎖的性能比較好!

線程之間的交互,有一個類叫Condition:Condition 的方法與 wait 、notify 和 notifyAll 方法類似,分別命名為 await 、 signal 和 signalAll

後面會詳細的介紹

七,ReadWriteLock

ReadWriteLock 維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。寫入鎖是獨占的

這個鎖比互斥鎖的性能上應該好些(讀的頻率大於寫的頻率)

  1 import java.util.Calendar;
  2 import java.util.Map;
  3 import java.util.TreeMap;
  4 import java.util.concurrent.locks.Lock;
  5 import java.util.concurrent.locks.ReentrantReadWriteLock;
  6 
  7 /**
  8  * StudySjms
  9  * <p>
 10  * Created by haozb on 2018/3/2.
 11  */
 12 public class ReadWriteLockDemo {
 13     private ReentrantReadWriteLock lock = null;
 14     private Lock readLock = null;// 讀鎖
 15     private Lock writeLock = null;// 寫鎖
 16     public int key = 100;
 17     public int index = 100;
 18     public Map<Integer, String> dataMap = null;// 線程共享數據
 19 
 20     public ReadWriteLockDemo() {
 21         lock = new ReentrantReadWriteLock(true);
 22         readLock = lock.readLock();
 23         writeLock = lock.writeLock();
 24         dataMap = new TreeMap<Integer, String>();
 25     }
 26 
 27     public static void main(String[] args) {
 28         ReadWriteLockDemo tester = new ReadWriteLockDemo();
 29 // 第一次獲取鎖
 30         tester.writeLock.lock();
 31         System.out
 32                 .println(Thread.currentThread().getName() + " get writeLock.");
 33 // 第二次獲取鎖,應為是可重入鎖
 34         tester.writeLock.lock();
 35         System.out
 36                 .println(Thread.currentThread().getName() + " get writeLock.");
 37         tester.readLock.lock();
 38         System.out.println(Thread.currentThread().getName() + " get readLock");
 39         tester.readLock.lock();
 40         System.out.println(Thread.currentThread().getName() + " get readLock");
 41         tester.readLock.unlock();
 42         tester.readLock.unlock();
 43         tester.writeLock.unlock();
 44         tester.writeLock.unlock();
 45         tester.test();
 46     }
 47 
 48     public void test() {
 49 // 讀線程比寫線程多
 50         for (int i = 0; i < 10; i++) {
 51             new Thread(new reader(this)).start();
 52         }
 53         for (int i = 0; i < 3; i++) {
 54             new Thread(new writer(this)).start();
 55         }
 56     }
 57 
 58     public void read() {
 59 // 獲取鎖
 60         readLock.lock();
 61         try {
 62             if (dataMap.isEmpty()) {
 63                 Calendar now = Calendar.getInstance();
 64                 System.out.println(now.getTime().getTime() + " R "
 65                         + Thread.currentThread().getName()
 66                         + " get key, but map is empty.");
 67             }
 68             String value = dataMap.get(index);
 69             Calendar now = Calendar.getInstance();
 70             System.out.println(now.getTime().getTime() + " R "
 71                     + Thread.currentThread().getName() + " key = " + index
 72                     + " value = " + value + " map size = " + dataMap.size());
 73             if (value != null) {
 74                 index++;
 75             }
 76         } finally {
 77 // 釋放鎖
 78             readLock.unlock();
 79         }
 80         try {
 81             Thread.sleep(3000);
 82         } catch (Exception e) {
 83 
 84         }
 85     }
 86 
 87     public void write() {
 88         writeLock.lock();
 89         try {
 90             String value = "value" + key;
 91             dataMap.put(new Integer(key), value);
 92             Calendar now = Calendar.getInstance();
 93             System.out.println(now.getTime().getTime() + " W "
 94                     + Thread.currentThread().getName() + " key = " + key
 95                     + " value = " + value + " map size = " + dataMap.size());
 96             key++;
 97             try {
 98                 Thread.sleep(500);
 99             } catch (InterruptedException e) {
100                 e.printStackTrace();
101             }
102         } finally {
103             writeLock.unlock();
104         }
105     }
106 }
107 
108 class reader implements Runnable {
109     private ReadWriteLockDemo tester = null;
110 
111     public reader(ReadWriteLockDemo tester) {
112         this.tester = tester;
113     }
114 
115     @Override
116     public void run() {
117         Calendar now = Calendar.getInstance();
118         System.out.println(now.getTime().getTime() + " R "
119                 + Thread.currentThread().getName() + " started");
120         for (int i = 0; i < 10; i++) {
121             tester.read();
122         }
123     }
124 }
125 
126 class writer implements Runnable {
127     private ReadWriteLockDemo tester = null;
128 
129     public writer(ReadWriteLockDemo tester) {
130         this.tester = tester;
131     }
132 
133     @Override
134     public void run() {
135         Calendar now = Calendar.getInstance();
136         System.out.println(now.getTime().getTime() + " W "
137                 + Thread.currentThread().getName() + " started");
138         for (int i = 0; i < 10; i++) {
139             tester.write();
140         }
141     }
142 }

Java 並發編程實踐基礎 讀書筆記: 第三章 使用 JDK 並發包構建程序