1. 程式人生 > >java 多執行緒 同步 觀察者 併發集合的一個例子

java 多執行緒 同步 觀察者 併發集合的一個例子

//第一版
package com.hra.riskprice;

import com.hra.riskprice.SysEnum.Factor_Type;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.swing.text.html.HTMLDocument;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.TimeUnit;

 
class ForwardingSet<E> implements Set<E>{ private final Set<E> s; public ForwardingSet(Set<E> s){this.s=s;} @Override public void clear() { s.clear(); } public boolean isEmpty(){return s.isEmpty();} public int size(){return s.size();}
public Iterator<E> iterator(){return s.iterator();} public boolean add(E e){return s.add(e);} public boolean remove(Object o){return s.remove(o);} public boolean containsAll(Collection<?> c){return s.containsAll(c);} public boolean addAll(Collection<? extends E> c){
return s.addAll(c); } public boolean removeAll(Collection<?> c){ return s.removeAll(c); } public boolean retainAll(Collection<?> c){ return s.retainAll(c); } @Override public Object[] toArray() { return s.toArray(); } @Override public <T> T[] toArray(T[] a) { return s.toArray(a); } @Override public boolean equals(Object o) { return s.equals(o); } @Override public int hashCode() { return s.hashCode(); } @Override public String toString() { return s.toString(); } @Override public boolean contains(Object o) { return s.contains(o); } } interface SetObserver<E>{ void added(ObservableSet<E> set,E element); } class ObservableSet<E> extends ForwardingSet<E>{ public ObservableSet(Set<E> set){ super(set); } private final List<SetObserver<E>> observers=new ArrayList<SetObserver<E>>(); public void addObserver(SetObserver<E> observer){ synchronized(observers){ observers.add(observer); } } public boolean removeObserver(SetObserver<E> observer){ synchronized (observers){ return observers.remove(observer); } } public void notifyElementAdded(E element){ synchronized (observers){ for(SetObserver<E> observer:observers){ observer.added(this,element); } } } @Override public boolean add(E e) { boolean added=super.add(e); if(added){ notifyElementAdded(e); } return added; } @Override public boolean addAll(Collection<? extends E> c) { boolean result=false; for(E element:c){ result|=add(element); } return result; } } @SpringBootApplication public class RiskpriceApplication { public static void main(String[] args) { ObservableSet<Integer> set=new ObservableSet<Integer>(new HashSet<Integer>()); set.addObserver(new SetObserver<Integer>() { @Override public void added(ObservableSet<Integer> s, Integer e) { System.out.println(e); if(e==23){ s.removeObserver(this); } } }); for(int i=0;i<100;i++){ set.add(i); } } } 你覺得會列印0~23嗎,實際上執行後就掛了,for迴圈遍歷過程中,不允許修改列舉列表,我們可以考慮通過另外一個執行緒去移除這個觀察者,也是下面過度得第二版了 通過 ExecutorService //第二版 package com.hra.riskprice; import com.hra.riskprice.SysEnum.Factor_Type; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.swing.text.html.HTMLDocument; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class ForwardingSet<E> implements Set<E>{ private final Set<E> s; public ForwardingSet(Set<E> s){this.s=s;} @Override public void clear() { s.clear(); } public boolean isEmpty(){return s.isEmpty();} public int size(){return s.size();} public Iterator<E> iterator(){return s.iterator();} public boolean add(E e){return s.add(e);} public boolean remove(Object o){return s.remove(o);} public boolean containsAll(Collection<?> c){return s.containsAll(c);} public boolean addAll(Collection<? extends E> c){ return s.addAll(c); } public boolean removeAll(Collection<?> c){ return s.removeAll(c); } public boolean retainAll(Collection<?> c){ return s.retainAll(c); } @Override public Object[] toArray() { return s.toArray(); } @Override public <T> T[] toArray(T[] a) { return s.toArray(a); } @Override public boolean equals(Object o) { return s.equals(o); } @Override public int hashCode() { return s.hashCode(); } @Override public String toString() { return s.toString(); } @Override public boolean contains(Object o) { return s.contains(o); } } interface SetObserver<E>{ void added(ObservableSet<E> set,E element); } class ObservableSet<E> extends ForwardingSet<E>{ public ObservableSet(Set<E> set){ super(set); } private final List<SetObserver<E>> observers=new ArrayList<SetObserver<E>>(); public void addObserver(SetObserver<E> observer){ synchronized(observers){ observers.add(observer); } } public boolean removeObserver(SetObserver<E> observer){ synchronized (observers){ return observers.remove(observer); } } public void notifyElementAdded(E element){ synchronized (observers){ for(SetObserver<E> observer:observers){ observer.added(this,element); } } } @Override public boolean add(E e) { boolean added=super.add(e); if(added){ notifyElementAdded(e); } return added; } @Override public boolean addAll(Collection<? extends E> c) { boolean result=false; for(E element:c){ result|=add(element); } return result; } } @SpringBootApplication public class RiskpriceApplication { public static void main(String[] args) throws InterruptedException{ ObservableSet<Integer> set=new ObservableSet<Integer>(new HashSet<Integer>()); set.addObserver(new SetObserver<Integer>() { @Override public void added(ObservableSet<Integer> s, Integer e) { System.out.println(e); if(e==23){ ExecutorService excutor= Executors.newSingleThreadExecutor(); final SetObserver<Integer> observer=this; try{ excutor.submit(new Runnable() { @Override public void run() { s.removeObserver(observer); } }).get(); }catch (ExecutionException ex){ throw new AssertionError(ex.getCause()); }catch (InterruptedException ex){ throw new AssertionError(ex.getCause()); }finally { excutor.shutdown(); } } } }); for(int i=0;i<100;i++){ set.add(i); } } } 第二版雖然會列印到23但是實際上並沒有成功, public void run() { s.removeObserver(observer); } 進入 public boolean removeObserver(SetObserver<E> observer){ synchronized (observers){ return observers.remove(observer); } } 經過同步快synchronized 的時候將會遭遇死鎖,因為主執行緒已經鎖定了observers,只有等待子執行緒執行完成後才會釋放鎖,而子執行緒又在等待鎖的釋放,這樣相互的等待就造成了死鎖,但是由於Java設計的鎖是可重入的,這種呼叫不會產生死鎖,但會產生一個異常,因為呼叫執行緒正在該鎖所保護的執行緒上進行著。這種失敗可能是災難性的,本質來說這個鎖,沒有盡到它的職責。可重入的鎖簡化了多執行緒的面向物件程式構造,但是它可能會將活性失敗,變成安全性失敗(參考自Effective java) 什麼解決呢,來個2.1版本吧 我們建立個快照,而不使用原observers,這樣每個通知都使用了自己的快照觀察者列表引用就不會死鎖了 public void notifyElementAdded(E element){ List<SetObserver<E>> snaphot=null;//快照 synchronized (observers){ snaphot=new ArrayList<SetObserver<E>>(observers); } for(SetObserver<E> observer:snaphot){ observer.added(this,element); } } //第三版 事實上,要將外來方法的呼叫移出同步程式碼塊還有更好的方法,從java1.5發行版以來,提供了併發集合 corrent collection ,稱作 CopyOnWriteArrayList, 這是專門為此定製的,他是Arraylist的一種變體,通過重新拷貝整個底層陣列,在這裡實現所有的操作,由於內部陣列永遠不動(歸功於重新拷貝),因此迭代不需要鎖定,大量使用有效能影響,但對於觀察者列表幾乎不變來說卻是很好的,因為他們幾乎不改動,並且經常遍歷 第三版較之前2.1版本更改如下: private final List<SetObserver<E>> observers=new CopyOnWriteArrayList<SetObserver<E>>(); public void addObserver(SetObserver<E> observer){ //synchronized(observers){ observers.add(observer); //} } public boolean removeObserver(SetObserver<E> observer){ //synchronized (observers){ return observers.remove(observer); //} } public void notifyElementAdded(E element){ //List<SetObserver<E>> snaphot=null;//快照 //synchronized (observers){ // snaphot=new ArrayList<SetObserver<E>>(observers); //} for(SetObserver<E> observer:observers){ observer.added(this,element); } } 當然這個方法也可以改了,因為實際操作的時候底層是重新拷貝,所以也就不需要通過另外一個執行緒去移除引用了 修改如下: set.addObserver(new SetObserver<Integer>() { @Override public void added(ObservableSet<Integer> s, Integer e) { System.out.println(e); if(e==23){ s.removeObserver(this); // ExecutorService excutor= Executors.newSingleThreadExecutor(); // final SetObserver<Integer> observer=this; // try{ // excutor.submit(new Runnable() { // @Override // public void run() { // s.removeObserver(observer); // } // }).get(); // // }catch (ExecutionException ex){ // throw new AssertionError(ex.getCause()); // }catch (InterruptedException ex){ // throw new AssertionError(ex.getCause()); // }finally { // excutor.shutdown(); // } } } });