1. 程式人生 > >java中併發Queue種類與各自API特點以及使用場景!

java中併發Queue種類與各自API特點以及使用場景!

一 先說下佇列

佇列是一種資料結構.它有兩個基本操作:在佇列尾部加入一個元素,和從佇列頭部移除一個元素(注意不要弄混佇列的頭部和尾部)

就是說,佇列以一種先進先出的方式管理資料,如果你試圖向一個 已經滿了的阻塞佇列中新增一個元素或者是從一個空的阻塞佇列中移除一個元索,將導致執行緒阻塞.

在多執行緒進行合作時,阻塞佇列是很有用的工具。工作者執行緒可以定期地把中間結果存到阻塞佇列中而其他工作者執行緒把中間結果取出並在將來修改它們。佇列會自動平衡負載。

如果第一個執行緒集執行得比第二個慢,則第二個 執行緒集在等待結果時就會阻塞。如果第一個執行緒集執行得快,那麼它將等待第二個執行緒集趕上來.

說白了,就是先進先出,執行緒安全!

java中併發佇列都是在java.util.concurrent併發包下的,Queue介面與List、Set同一級別,都是繼承了Collection介面,最近學習了java中的併發Queue的所有子類應用場景,這裡記錄分享一下:

1.1 這裡可以先用wait與notify(腦忒fai) 模擬一下佇列的增刪資料,簡單瞭解一下佇列:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

import java.util.LinkedList;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

/**

* 模擬佇列增刪資料

* @author houzheng

*/

public class MyQueue {

//元素集合

private LinkedList<Object> list=new LinkedList<Object>();

//計數器(同步),判斷集合元素數量

private AtomicInteger count=new AtomicInteger();

//集合上限與下限,final必須指定初值

private final int minSize=0;

private final int maxSize;

//構造器指定最大值

public MyQueue(int maxSize) {

this.maxSize = maxSize;

}

//初始化物件,用於加鎖,也可直接用this

private Object lock=new Object();

//put方法:往集合中新增元素,如果集合元素已滿,則此執行緒阻塞,直到有空間再繼續

public void put(Object obj){

synchronized (lock) {

while(count.get()==this.maxSize){

try {

lock.wait();

catch (InterruptedException e) {

e.printStackTrace();}

}

list.add(obj);

//計數器加一

count.incrementAndGet();

System.out.println("放入元素:"+obj);

//喚醒另一個執行緒,(處理極端情況:集合一開始就是空,此時take執行緒會一直等待)

lock.notify();

}

}

//take方法:從元素中取資料,如果集合為空,則執行緒阻塞,直到集合不為空再繼續

public Object take(){

Object result=null;

synchronized(lock){

while(count.get()==this.minSize){

try {

lock.wait();

catch (InterruptedException e) {

e.printStackTrace();}

}

//移除第一個

result=list.removeFirst();

//計數器減一

count.decrementAndGet();

System.out.println("拿走元素:"+result);

//喚醒另一個執行緒,(處理極端情況:集合一開始就是滿的,此時put執行緒會一直等待)

lock.notify();

}

return result;

}

public int getSize(){

return this.count.get();

}

public static void main(String[] args) {

//建立集合容器

MyQueue queue=new MyQueue(5);

queue.put("1");

queue.put("2");

queue.put("3");

queue.put("4");

queue.put("5");

System.out.println("當前容器長度為:"+queue.getSize());

Thread t1=new Thread(()->{

queue.put("6");

queue.put("7");

},"t1");

Thread t2=new Thread(()->{

Object take1 = queue.take();

Object take2 = queue.take();

},"t2");

//測試極端情況,兩秒鐘後再執行另一個執行緒

t1.start();

try {

TimeUnit.SECONDS.sleep(2);

catch (InterruptedException e) {

e.printStackTrace();

}

t2.start();

}

}

  這裡用執行緒通訊的方式簡單模擬了佇列的進出,那麼接下來就正式進入java的併發佇列:

二 併發Queue

JDK中併發佇列提供了兩種實現,一種是高效能佇列ConcurrentLinkedQueue,一種是阻塞佇列BlockingQueue,兩種都繼承自Queue:

1 ConcurrentLinkedQueue

這是一個使用於高併發場景的佇列(額,各位看這塊部落格的小朋友,最好對執行緒基礎比較熟悉再來看,當然我也在拼命學習啦,哈哈哈),主要是無鎖的方式,他的想能要比BlockingQueue好

,是基於連結節點的無界執行緒安全佇列,先進先出,不允許有null元素,廢話不多說,上demo:

這種queue比較簡單,沒什麼好說的,和ArrayList一樣用就可以,關鍵是BlockingQUeue

2 BlockingQueue

blockingQueue主要有5中實現,我感覺都挺有意思的,其中幾種還比較常用就都學習了下,這裡都介紹下:

2.1  ArrayBlockingQueue

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

@Test

public void test02() throws Exception{

//必須指定佇列長度

ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(2);

abq.add("a");

//add :新增元素,如果BlockingQueue可以容納,則返回true,否則拋異常,支援新增集合

System.out.println(abq.offer("b"));//容量如果不夠,返回false

//offer: 如果可能的話,新增元素,即如果BlockingQueue可以容納,則返回true,否則返回false,支援設定超時時間

//設定超時,如果超過時間就不新增,返回false, 

abq.offer("d"2, TimeUnit.SECONDS);// 新增的元素,時長,單位

//put 新增元素,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

abq.put("d");//會一直等待

//poll 取走頭部元素,若不能立即取出,則可以等time引數規定的時間,取不到時返回null,支援設定超時時間

abq.poll();

abq.poll(2,TimeUnit.SECONDS);//兩秒取不到返回null

//take()  取走頭部元素,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止

abq.take();

//取出頭部元素,但不刪除

abq.element();

//drainTo()

//一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。

List list=new ArrayList();

abq.drainTo(list,2);//將佇列中兩個元素取到list中,取走後佇列中就沒有取走的元素

System.out.println(list); //[a,b]

System.out.println(abq);  //[]

}

2.2 LinkedBlockingQueue

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

@Test

public void test03(){

LinkedBlockingQueue lbq=new LinkedBlockingQueue();//可指定容量,也可不指定

lbq.add("a");

lbq.add("b");

lbq.add("c");

//API與ArrayBlockingQueue相同

//是否包含

System.out.println(lbq.contains("a"));

//移除頭部元素或者指定元素  remove("a")

System.out.println(lbq.remove());

//轉陣列

Object[] array = lbq.toArray();

//element 取出頭部元素,但不刪除

System.out.println(lbq.element());

System.out.println(lbq.element());

System.out.println(lbq.element());

}

2.3 SynchronousQueue

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

public static void main(String[] args) {

SynchronousQueue<String> sq=new SynchronousQueue<String>();

// iterator() 永遠返回空,因為裡面沒東西。

// peek() 永遠返回null

/**

* isEmpty()永遠是true。

* remainingCapacity() 永遠是0。

* remove()和removeAll() 永遠是false。

*/

new Thread(()->{

try {

//取出並且remove掉queue裡的element(認為是在queue裡的。。。),取不到東西他會一直等。

System.out.println(sq.take());

catch (InterruptedException e) {

e.printStackTrace();

}

}).start();

new Thread(()->{

try {

//offer() 往queue裡放一個element後立即返回,

//如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false

//true ,上面take執行緒一直在等,

////下面剛offer進去就被拿走了,返回true,如果offer執行緒先執行,則返回false

System.out.println(sq.offer("b"));

catch (Exception e) {

e.printStackTrace();

}

}).start();

new Thread(()->{

try {

//往queue放進去一個element以後就一直wait直到有其他thread進來把這個element取走

sq.put("a");

catch (Exception e) {

e.printStackTrace();

}

}).start();

}

2.4 PriorityBlockingQueue

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

@Test

public void test04() throws Exception{

//佇列裡元素必須實現Comparable介面,用來決定優先順序

PriorityBlockingQueue<String> pbq=new PriorityBlockingQueue<String>();

pbq.add("b");

pbq.add("g");

pbq.add("a");

pbq.add("c");

//獲取的時候會根據優先順序取元素,插入的時候不會排序,節省效能

//System.out.println(pbq.take());//a,獲取時會排序,按優先順序獲取

System.out.println(pbq.toString());//如果前面沒有取值,直接syso也不會排序

Iterator<String> iterator = pbq.iterator();

while(iterator.hasNext()){

System.out.println(iterator.next());

}

}

@Test

public void test05(){

PriorityBlockingQueue<Person> pbq=new PriorityBlockingQueue<Person>();

Person p2=new Person("姚振",20);

Person p1=new Person("侯徵",24);

Person p3=new Person("何毅",18);

Person p4=new Person("李世彪",22);

pbq.add(p1);

pbq.add(p2);

pbq.add(p3);

pbq.add(p4);

System.out.println(pbq);//沒有按優先順序排序

try {

//只要take獲取元素就會按照優先順序排序,獲取一次就全部排好序了,後面就會按優先順序迭代

pbq.take();

catch (InterruptedException e) {

e.printStackTrace();

}

//按年齡排好了序

for (Iterator iterator = pbq.iterator(); iterator.hasNext();) {

Person person = (Person) iterator.next();

System.out.println(person);

}

}

2.5 最後說一下DelayQueue ,這裡用個網上很經典的例子,網咖上網計時

網民實體queue中元素

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

//網民

public class Netizen implements Delayed {

//身份證

private String ID;

//名字

private String name;

//上網截止時間

private long playTime;

//比較優先順序,時間最短的優先

@Override

public int compareTo(Delayed o) {

Netizen netizen=(Netizen) o;

return this.getDelay(TimeUnit.SECONDS)-o.getDelay(TimeUnit.SECONDS)>0?1:0;

}

public Netizen(String iD, String name, long playTime) {

ID = iD;

this.name = name;

this.playTime = playTime;

}

//獲取上網時長,即延時時長

@Override

public long getDelay(TimeUnit unit) {

//上網截止時間減去現在當前時間=時長

return this.playTime-System.currentTimeMillis();

}

 網咖類:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

//網咖

public class InternetBar implements Runnable {

//網民佇列,使用延時佇列

private DelayQueue<Netizen> dq=new DelayQueue<Netizen>();

//上網

public void startPlay(String id,String name,Integer money){

//截止時間= 錢數*時間+當前時間(1塊錢1秒)

Netizen netizen=new Netizen(id,name,1000*money+System.currentTimeMillis());

System.out.println(name+"開始上網計費......");

dq.add(netizen);

}

//時間到下機

public void endTime(Netizen netizen){

System.out.println(netizen.getName()+"餘額用完,下機");

}

@Override

public void run() {

//執行緒,監控每個網民上網時長

while(true){

try {

//除非時間到.否則會一直等待,直到取出這個元素為止

Netizen netizen=dq.take();

endTime(netizen);

}

catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) {

//新建一個網咖

InternetBar internetBar=new InternetBar();

//來了三個網民上網

internetBar.startPlay("001","侯徵",3);

internetBar.startPlay("002","姚振",7);

internetBar.startPlay("003","何毅",5);

Thread t1=new Thread(internetBar);

t1.start();

}

}

  這樣就可以完美實現業務需求了,

結果,

這塊東西比較深,還需要不斷加強學習實踐才行!!

好文要頂 關注我 收藏該文