1. 程式人生 > >同步類容器和並發類容器

同步類容器和並發類容器

內部實現 [] 經典的 方式 tac 並發數 指向 優化策略 use

一 同步類容器
同步類容器都是線程安全的,但在某些場景中可能需要加鎖來保證復合操作。
符合操作如:叠代(反復訪問元素,遍歷完容器中所有元素)、跳轉(根據指定的順序找到當前元素的下一個元素)、條件運算。這些復合操作在多線程並發地修改容器時,可能會表現出意外的行為,最經典的ConcurrentModificationException,原因是當前容器叠代的過程中,被並發的修改了內容,這是由於早期叠代器設計的時候沒有考慮到並發修改。

同步類容器:如古老的Vector、HashTable。這些容器的同步功能其實都是有JDK的Collections.synchronized***等工廠方法去創建實現的。其底層的機制無非就是用傳統的synchronized關鍵字對每個共用方法都進行同步,使得每次只有一個線程訪問容器的狀態。這不符合今天互聯網高並發的需求。
二 並發類容器


jdk1.5之後,提供了多種並發類容器來替代同步類容器,從而改善性能。同步類容器都是串行化的,它們雖然實現了線程安全,但是嚴重降低了並發性,在多線程環境時,嚴重降低了性能。

並發類容器是專門針對並發設計的,使用ConcurrentHashMap來替代給予散列的傳統的HashTable,而且在ConcurrentHashMap中,添加了一些常見復合操作的支持。以及使用CopyOnWriteArrayList替代Vector,並發的CopyOnWriteArraySet,以及並發的Queue,ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能隊列,後者是以阻塞形式的隊列,具體實現Queue還有很多,例如ArrayBlockingQueue、SynchronousQueue、PriorityBlockingQueue等。
2.1 ConcurrentMap

ConcurrentMap接口下兩個重要實現類:ConcurrentHashMap、ConcurrentSkipListMap(支持並發排序功能,彌補ConcurrentHashMap)
ConcurrentHashMap:內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,他們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以並發進行。把一個整體分成了16個段,也就是最高支持16個線程的並發修改操作。這也是在多線程場景時減小鎖的粒度從而降低鎖競爭的一種方案,並且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。
2.2 CopyOnWrite容器

Copy-On-Write簡稱COW,是一種用於程序設計中的優化策略。
JDK中的COW容器有兩種:CopyOnWriteArrayList、CopyOnWriteArraySet
CopyOnWrite容器即寫時復制的容器。通俗的理解是當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然後新的容器裏添加元素,添加完後,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行並發的讀,而且不需要加鎖,因為當前容器不會添加任何元素。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同容器。場景:讀多寫少
三 並發Queue
並發==隊列先進先出==,有兩套實現:ConcurrentLinkedQueue接口高性能隊列、BlockingQueue接口為代表的阻塞隊列
3.1 ConcurrentLinkedQueue類
ConcurrentLinkedQueue:是一個適用於高並發場景下的隊列,通過無鎖的方式,實現了高並發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue,它是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則,頭是最先加入的,尾是最近加入的,該隊列不允許null元素。
ConcurrentLinkedQueue隊列的重要方法:
1. add()和offer():都是加入元素的方法
2. poll()、take()和peek():都是取頭元素節點,區別在於前者會刪除元素,後者不會

3.1 BlockingQueue接口
1. ArrayBlockingQueue:基於數組實現的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,其內部==沒實現讀寫分離==,也就意味著生產和消費不能完全同步並行,長度是需要定義的,可以指定先進先出或者後進後出,也叫==有界隊列==,在很多場合非常適用
2. LinkedBlockingQueue:基於鏈表的阻塞隊列,與ArrayBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),LinkedBlockingQueue之所以能夠高效的處理並發數據,是因為其內部實現采用分離鎖(==讀寫分離==兩個鎖),從而實現生產者和消費者並行,是一個==無界隊列==
3. SynchronousQueue:一種==沒有緩沖==的隊列,生產者生產的數據直接會被消費者獲取並消費。不能添加,元素先take()取出阻塞,當有元素添加,後才取出

```
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
synchronousQueue.add("a");
```

> 運行結果:
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.zys.test.test.StringTest.main(StringTest.java:13)

```
public class SynQueue {

	public static void main(String[] args) {
		SynchronousQueue<String> queue = new SynchronousQueue<>();
		Thread t1 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("-----" + queue.take() + "\n");
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		},"t1");
		t1.start();
		Thread t2 = new Thread(new Runnable() {
			@Override
			public void run() {
				queue.add("aaaa");//先take()取出阻塞,當有元素添加,後才取出
				queue.add("bbbb");
			}
		},"t2");
		t2.start();
	}
}
``` 

> 運行結果:
-----aaaa
Exception in thread "t2"
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at Queue.SynQueue$2.run(SynQueue.java:25)
at java.lang.Thread.run(Thread.java:745)

4. PriorityBlockingQueue:基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口),在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的公平鎖,也是一個無界隊列

public class Task implements Comparable<Task>{

	private int id;
	private String name;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	@Override
	public int compareTo(Task o) {
		return this.id > o.id ? 1 : (this.id < o.id ? -1 : 0);
	}
	@Override
	public String toString() {
		return this.id + "," + this.name;
	}
}

public class UsePriorityBlockingQueue {

	public static void main(String[] args) {
		PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
		Task t1 = new Task();
		t1.setId(3);
		t1.setName("task1");
		Task t2 = new Task();
		t2.setId(6);
		t2.setName("task2");
		Task t3 = new Task();
		t3.setId(1);
		t3.setName("task3");
		queue.add(t1);
		queue.add(t2);
		queue.add(t3);
		System.out.println(queue);
		try {
			System.out.println(queue.peek());//peek和element取出但不移除
			System.out.println(queue.take());//take、poll和remove取出且移除
			//優先級排序,並返回優先級最高的元素,並在隊列中刪除取出的元素
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(queue);
		for (Iterator<Task> iter = queue.iterator() ; iter.hasNext();) {
			Task task = iter.next();
			System.out.println(task.getName());
		}
	}
}

> 運行結果:
[1,task3, 6,task2, 3,task1]
1,task3
1,task3
[3,task1, 6,task2]
task1
task2
特點:添加元素時不排序,在取值時根據優先級排序

5. DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的==延遲時間==到了,才能夠從隊列中獲取到該元素。DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景很多,比如對緩存超時的數據進行移除、任務超時處理、空閑連接的關閉等等。

public class Wangmin implements Delayed{

	private String id;
	private String name;
	private long endTime;
	private TimeUnit timeUnit = TimeUnit.SECONDS; //設定單位為秒
	
	public Wangmin(String id, String name, long endTime) {
		super();
		this.id = id;
		this.name = name;
		this.endTime = endTime;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public long getEndTime() {
		return endTime;
	}
	public void setEndTime(long endTime) {
		this.endTime = endTime;
	}
	
	/**
	 * 上網時間相互比較,確定排序
	 */
	@Override
	public int compareTo(Delayed o) {
		Wangmin min = (Wangmin)o;
		return this.getDelay(this.timeUnit) - min.getDelay(this.timeUnit) > 0 ? 1 : 0;
	}
	/**
	 * 返回延遲時間
	 */
	@Override
	public long getDelay(TimeUnit unit) {
		return this.endTime - System.currentTimeMillis();
	}
	
}


public class WangBa implements Runnable{

	private DelayQueue<Wangmin> delayQueue = new DelayQueue<Wangmin>();
	public boolean yinye = true;
	
	public void shangji(String name, String id, int money) {
		Wangmin min = new Wangmin(id, name, money * 1000 + System.currentTimeMillis());
		System.out.println("網民:" + min.getName() + ",身份證號碼為:" + min.getId() + ",網費:" + money + "\t開始上網,上網結束時間為:" + min.getEndTime());
		this.delayQueue.add(min);
	}
	
	public void xiaji(Wangmin min) {
		System.out.println("網民:" + min.getName() + "下機時間到:" + System.currentTimeMillis());
	}
	@Override
	public void run() {
		while(yinye) {
			try {
				Wangmin min = delayQueue.take();
				xiaji(min);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	public static void main(String[] args) {
		System.out.println("網吧開始營業...");
		WangBa wangBa = new WangBa();
		Thread shangwang = new Thread(wangBa);
		shangwang.start();
		wangBa.shangji("路人甲", "111", 1);
		wangBa.shangji("路人乙", "222", 10);
		wangBa.shangji("路人丙", "333", 5);
	}
}

> 運行結果:
網吧開始營業...
網民:路人甲,身份證號碼為:111,網費:1 開始上網,上網結束時間為:1532587709166
網民:路人乙,身份證號碼為:222,網費:10 開始上網,上網結束時間為:1532587718167
網民:路人丙,身份證號碼為:333,網費:5 開始上網,上網結束時間為:1532587713168
網民:路人甲下機時間到:1532587709166
網民:路人丙下機時間到:1532587713168
網民:路人乙下機時間到:1532587718167

同步類容器和並發類容器