1. 程式人生 > >【java】-- java並發包總結

【java】-- java並發包總結

領域 get .get exti icc 是個 列操作 nsvalue 出隊

1、同步容器類

1.1、Vector與ArrayList異同

1、Arraylist和Vector都是采用數組方式存儲數據,都允許直接序號索引元素,所以查找速度快,但是插入數據等操作涉及到數組元素移動等內存操作,所以插入數據慢

2、 Vector的方法都是同步的(Synchronized),是線程安全的(thread-safe),而ArrayList的方法不是,由於線程的同步必然要影響性能,因此,ArrayList的性能比Vector好。

3、當Vector或ArrayList中的元素超過它的初始大小時,Vector會將它的容量翻倍,而ArrayList只增加50%的大小,這樣,ArrayList就有利於節約內存空間。

1.2、HashTable與HashMap

1、hashTable是線程安全的,hashMap不是。它們的性能方面的比較類似 Vector和ArrayList

2、HashMap允許將null作為一個entry的key或者value,而Hashtable不允許。HashMap把Hashtable的contains方法去掉了,改成containsvalue和containsKey。

1.3、synchronizedMap

Collections.synchronized*(m) 將線程不安全額集合變為線程安全集合

1.4、ConcurrentHashMap

ConcurrentMap接口下有倆個重要的實現 :

ConcurrentHashMap
ConcurrentskipListMap (支持並發排序功能。彌補ConcurrentHas hMa p)
ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個
小的HashTable,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以並
發進行。把一個整體分成了16個段(Segment.也就是最高支持16個線程的並發修改操作。
這也是在重線程場景時減小鎖的粒度從而降低鎖競爭的一種方案。並且代碼中大多共享變
量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。

1.5、CountDownLatch

作用:CountDownLatch類位於java.util.concurrent包下,利用它可以實現類似計數器的功能。計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。

如何工作:

CountDownLatch構造器中的計數值(count參數)實際上就是閉鎖需要等待的線程數量。這個值只能被設置一次,而且CountDownLatch沒有提供任何機制去重新設置這個計數值

與CountDownLatch的第一次交互是主線程等待其他線程。

主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。

每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。

應用舉例:有一個任務A(主線程),它要等待其他2個任務執行完畢之後才能執行,此時就可以利用CountDownLatch來實現這種功能了。

publicclass Test002 {

      publicstaticvoid main(String[] args) throws InterruptedException {

           System.out.println("等待子線程執行完畢...");

           CountDownLatch countDownLatch = new CountDownLatch(2);

           new Thread(new Runnable() {

                 @Override

                 publicvoid run() {

                      System.out.println("子線程," + Thread.currentThread().getName() + "開始執行...");

                      countDownLatch.countDown();// 每次減去1
 
                      System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");

                 }

           }).start();

           new Thread(new Runnable() {

                 @Override

                 publicvoid run() {

                      System.out.println("子線程," + Thread.currentThread().getName() + "開始執行...");

                      countDownLatch.countDown();

                      System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");

                 }

           }).start();

           countDownLatch.await();// 調用當前方法主線程阻塞  countDown結果為0, 阻塞變為運行狀態

           System.out.println("兩個子線程執行完畢....");

           System.out.println("繼續主線程執行..");

      }

}

使用場景:

  1. 實現最大的並行性:有時我們想同時啟動多個線程,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們創建一個初始計數為1的CountDownLatch,並讓所有線程都在這個鎖上等待,那麽我們可以很輕松地完成測試。我們只需調用 一次countDown()方法就可以讓所有的等待線程同時恢復執行。
  2. 開始執行前等待n個線程完成各自任務:例如應用程序啟動類要確保在處理用戶請求前,所有N個外部系統已經啟動和運行了。
  3. 死鎖檢測:一個非常方便的使用場景是,你可以使用n個線程訪問共享資源,在每次測試階段的線程數目是不同的,並嘗試產生死鎖。

1.6、CyclicBarrier

CyclicBarrier初始化時規定一個數目,然後計算調用了CyclicBarrier.await()進入等待的線程數。當線程數達到了這個數目時,所有進入等待狀態的線程被喚醒並繼續。

CyclicBarrier就象它名字的意思一樣,可看成是個障礙, 所有的線程必須到齊後才能一起通過這個障礙。

CyclicBarrier初始時還可帶一個Runnable的參數, 此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。

class Writer extends Thread {

private CyclicBarrier cyclicBarrier;

public Writer(CyclicBarrier cyclicBarrier){

this.cyclicBarrier=cyclicBarrier;

}

@Override

public void run() {

System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數據");

try {

Thread.sleep(3000);

} catch (Exception e) {

// TODO: handle exception

}

System.out.println("線程" + Thread.currentThread().getName() + ",寫入數據成功.....");

try {

cyclicBarrier.await();

} catch (Exception e) {

}

System.out.println("所有線程執行完畢..........");

}

}

public class Test001 {

public static void main(String[] args) {

CyclicBarrier cyclicBarrier=new CyclicBarrier(5);

for (int i = 0; i < 5; i++) {

Writer writer = new Writer(cyclicBarrier);

writer.start();

}

}

}

Semaphore

Semaphore是一種基於計數的信號量。它可以設定一個閾值,基於此,多個線程競爭獲取許可信號,做自己的申請後歸還,超過閾值後,線程申請許可信號將會被阻塞。Semaphore可以用來構建一些對象池,資源池之類的,比如數據庫連接池,我們也可以創建計數為1的Semaphore,將其作為一種類似互斥鎖的機制,這也叫二元信號量,表示兩種互斥狀態。它的用法如下:

availablePermits函數用來獲取當前可用的資源數量

wc.acquire(); //申請資源

wc.release();// 釋放資源

// 創建一個計數閾值為5的信號量對象

// 只能5個線程同時訪問

Semaphore semp = new Semaphore(5);

try {

// 申請許可

semp.acquire();

try {

// 業務邏輯

} catch (Exception e) {

} finally {

// 釋放許可

semp.release();

}

} catch (InterruptedException e) {

}

案例:

需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麽辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麽1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。

代碼:

class Parent implements Runnable {

private String name;

private Semaphore wc;

public Parent(String name,Semaphore wc){

this.name=name;

this.wc=wc;

}

@Override

public void run() {

try {

// 剩下的資源(剩下的茅坑)

int availablePermits = wc.availablePermits();

if (availablePermits > 0) {

System.out.println(name+"天助我也,終於有茅坑了...");

} else {

System.out.println(name+"怎麽沒有茅坑了...");

}

//申請茅坑 如果資源達到3次,就等待

wc.acquire();

System.out.println(name+"終於輪我上廁所了..爽啊");

Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。

System.out.println(name+"廁所上完了...");

wc.release();

} catch (Exception e) {

}

}

}

public class TestSemaphore02 {

public static void main(String[] args) {

// 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麽辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麽1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。

Semaphore semaphore = new Semaphore(3);

for (int i = 1; i <=10; i++) {

Parent parent = new Parent("第"+i+"個人,",semaphore);

new Thread(parent).start();

}

}

}

並發隊列

在並發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高性能隊

列,一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都繼承自Queue。

ConcurrentLinkedDeque
ConcurrentLinkedQueue : 是一個適用於高並發場景下的隊列,通過無鎖的方式,實現
了高並發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue.它
是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最先
加入的,尾是最近加入的,該隊列不允許null元素。
ConcurrentLinkedQueue重要方法:
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別)
poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。

ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();

q.offer("余勝軍");

q.offer("碼雲");

q.offer("螞蟻課堂");

q.offer("張傑");

q.offer("艾姐");

//從頭獲取元素,刪除該元素

System.out.println(q.poll());

//從頭獲取元素,不刪除該元素

System.out.println(q.peek());

//獲取總長度

System.out.println(q.size());

BlockingQueue

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:

在隊列為空時,獲取元素的線程會等待隊列變為非空。
當隊列滿時,存儲元素的線程會等待隊列可用。

阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。

BlockingQueue即阻塞隊列,從阻塞這個詞可以看出,在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:

1. 當隊列滿了的時候進行入隊列操作

2. 當隊列空了的時候進行出隊列操作

因此,當一個線程試圖對一個已經滿了的隊列進行入隊列操作時,它將會被阻塞,除非有另一個線程做了出隊列操作;同樣,當一個線程試圖對一個空隊列進行出隊列操作時,它將會被阻塞,除非有另一個線程進行了入隊列操作。

在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。

在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效並且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

認識BlockingQueue

阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中所起的作用大致如下圖所示:

從上圖我們可以很清楚看到,通過一個共享的隊列,可以使得數據由隊列的一端輸入,從另外一端輸出;

常用的隊列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)

  先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似於排隊的功能。從某種程度上來說這種隊列也體現了一種公平性。

  後進先出(LIFO):後插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。

多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若幹生產者線程,另外又有若幹個消費者線程。如果生產者線程需要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並且當生產出來的數據累積到一定程度的時候,那麽生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)

下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:

ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。

ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面

是一個初始化和使用ArrayBlockingQueue的例子:

ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);

arrays.add("李四");

arrays.add("張軍");

arrays.add("張軍");

// 添加阻塞隊列

arrays.offer("張三", 1, TimeUnit.SECONDS);

LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是采用了默認大小為Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。

和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueuelinkedBlockingQueue = new LinkedBlockingQueue(3);

linkedBlockingQueue.add("張三");

linkedBlockingQueue.add("李四");

linkedBlockingQueue.add("李四");

System.out.println(linkedBlockingQueue.size());

PriorityBlockingQueue

PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue一樣。需要註

意,PriorityBlockingQueue中允許插入null對象。

所有插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就

是按照我們對這個接口的實現來定義的。

另外,我們可以從PriorityBlockingQueue獲得一個叠代器Iterator,但這個叠代器並不保證按照優先級順

序進行叠代。

下面我們舉個例子來說明一下,首先我們定義一個對象類型,這個對象需要實現Comparable接口:

SynchronousQueue

SynchronousQueue隊列內部僅允許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另一個線程消費。

使用BlockingQueue模擬生產者與消費者

class ProducerThread implements Runnable {

private BlockingQueue queue;

private volatile boolean flag = true;

private static AtomicInteger count = new AtomicInteger();

public ProducerThread(BlockingQueue queue) {

this.queue = queue;

}

@Override

public void run() {

try {

System.out.println("生產線程啟動...");

while (flag) {

System.out.println("正在生產數據....");

String data = count.incrementAndGet()+"";

// 將數據存入隊列中

boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);

if (offer) {

System.out.println("生產者,存入" + data + "到隊列中,成功.");

} else {

System.out.println("生產者,存入" + data + "到隊列中,失敗.");

}

Thread.sleep(1000);

}

} catch (Exception e) {

} finally {

System.out.println("生產者退出線程");

}

}

public void stop() {

this.flag = false;

}

}

class ConsumerThread implements Runnable {

private BlockingQueue<String> queue;

private volatile boolean flag = true;

public ConsumerThread(BlockingQueue<String> queue) {

this.queue = queue;

}

@Override

public void run() {

System.out.println("消費線程啟動...");

try {

while (flag) {

System.out.println("消費者,正在從隊列中獲取數據..");

String data = queue.poll(2, TimeUnit.SECONDS);

if (data != null) {

System.out.println("消費者,拿到隊列中的數據data:" + data);

Thread.sleep(1000);

} else {

System.out.println("消費者,超過2秒未獲取到數據..");

flag = false;

}

}

} catch (Exception e) {

e.printStackTrace();

} finally {

System.out.println("消費者退出線程...");

}

}

}

public class ProducerAndConsumer {

public static void main(String[] args) throws InterruptedException {

BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

ProducerThread producerThread1 = new ProducerThread(queue);

ProducerThread producerThread2 = new ProducerThread(queue);

ConsumerThread consumerThread1 = new ConsumerThread(queue);

Thread t1 = new Thread(producerThread1);

Thread t2 = new Thread(producerThread2);

Thread c1 = new Thread(consumerThread1);

t1.start();

t2.start();

c1.start();

// 執行10s

Thread.sleep(10 * 1000);

producerThread1.stop();

producerThread2.stop();

}

}

【java】-- java並發包總結