並發編程(八):線程安全策略
通常我們保證線程安全策略的方式有以下幾種:
a、不可變對象
b、線程封閉
c、同步容器
d、並發容器
不可變對象
可參考string類,可以采用的方式是將類聲明為final,將所有成員都聲明為私有的,對變量不提供set方法,將所有可變成員聲明為final,通過構造器初始化所有成員,進行深度拷貝,在get方法中不直接返回對象本身,而是返回對象的拷貝。
關於final,我們詳細說明一下
final-demo
@Slf4j public class ImmutableExample1 { private final static Integer a = 1;private final static String b = "2"; private final static Map<Integer, Integer> map = Maps.newHashMap(); static { map.put(1, 2); map.put(3, 4); map.put(5, 6); } public static void main(String[] args) { // 被final修飾的基本數據類型無法改變 // a = 2; // b = "2";// 引用對象,此引用無法指向別的對象,但可修改該對象的值 map.put(1,3); log.info("{}", map.get(1)); } //final可修飾傳遞進來的對象 private void test(final int a) { } }
此demo需要我們註意的是,final修飾引用類型時,雖然不能將引用再指向別的對象,但可修改該對象的值;此外final還可修飾參數,這樣傳遞進來的參數a無法被修改。此demo不是線程安全的
除了final可以定義不可變對象,java提供的Collections類,也可定義不可變對象,Collections.unmodifiableXXX傳入的對象一經初始化便無法修改,XXX可表示Collection、List、Set、Map等,谷歌提供的Guava類,也有類似的功能,ImmutableXXX,XXX同樣可表示Collection、List、Set、Map等
Collections-demo
@Slf4j public class ImmutableExample2 { private static Map<Integer, Integer> map = Maps.newHashMap(); static { map.put(1, 2); map.put(3, 4); map.put(5, 6); //此處理後的map的值是不可以修改的 map = Collections.unmodifiableMap(map); } public static void main(String[] args) { // map.put(1,3); log.info("{}", map.get(1)); } }
輸出如下:
可見,用Collections.UnmodifiableMap修飾的對象是不可修改的,如果嘗試修改對象的值,在程序運行時會拋出異常,此方法的實現可參考源碼(其實就是將一個新的集合的所有更新方法變為拋出異常) 此demo是線程安全的。
ImmutableSet-demo
public class ImmutableExample3 { //以下為不可變對象的集合 private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3); private final static ImmutableSet set = ImmutableSet.copyOf(list); private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1,2,3,4); private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder() .put(1, 2).put(3, 4).put(5, 6).build(); public static void main(String[] args) { // 此時不同意再添加新的元素 map2.put(1, 3); } }
輸出如下:
此demo是線程安全的,開發時如果我們的對象可以變為不可變對象,我們盡量將對象變為不可變對象,這樣可以避免線程安全問題
線程封閉
線程封閉就是把對象封裝到一個線程裏,只有一個線程可以看到這個對象,這樣就算這個對象不是線程也不會有線程安全問題
關於堆棧封閉,我們自己定義的局部變量被多個線程訪問時,每個局部變量都會被拷貝一份放到線程的棧中去,這樣每個線程操作的對象相當於是不同的,所以不會有線程安全問題(全局變量容易引發並發問題);ThreadLocal使用map實現了線程封閉,map的key是線程id,map的值是封閉的對象
THreadLocal-demo
定義RequestHolder類來操作ThreadLocal:
public class RequestHolder { //只有當項目重新啟動的時候,threadLocal中存儲的值才會被釋放 private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>(); public static void add(Long id) { requestHolder.set(id); } public static Long getId() { return requestHolder.get(); } public static void remove() { requestHolder.remove(); } }
註意,如add方法,只需傳入需要封閉的對象即可,key值會自動取線程id放入,get和remove方法類似
定義HttpFilter類處理請求:
@Slf4j public class HttpFilter implements Filter { @Override public void init(FilterConfig filterConfig) throws ServletException { } @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { HttpServletRequest request = (HttpServletRequest) servletRequest; log.info("do filter,{},{}", Thread.currentThread().getId(), request.getServletPath()); RequestHolder.add(Thread.currentThread().getId()); //使請求繼續被處理,不要攔住不動 filterChain.doFilter(servletRequest, servletResponse); } @Override public void destroy() { } }
配置啟動類:
@SpringBootApplication public class ConcurrencyApplication extends WebMvcConfigurerAdapter { public static void main(String[] args) { SpringApplication.run(ConcurrencyApplication.class, args); } @Bean public FilterRegistrationBean httpFilter() { FilterRegistrationBean registrationBean = new FilterRegistrationBean(); registrationBean.setFilter(new HttpFilter()); registrationBean.addUrlPatterns("/threadLocal/*"); return registrationBean; } @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/*"); } }
攔截以threadLocal開頭的url,並利用Interceptor攔截所有的接口
定義Interceptor
@Slf4j public class HttpInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { log.info("preHandle"); return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { RequestHolder.remove(); log.info("afterCompletion"); return; } }
此Interceptor的作用是當接口處理後,移除ThreadLocal中對應的值
定義Controller來進行驗證
@Controller @RequestMapping("/threadLocal") public class ThreadLocalController { @RequestMapping("/test") @ResponseBody public Long test() { return RequestHolder.getId(); } }
同步容器
我們都知道ArrayList、HashMap等為線程不安全的,上圖標識了它們對應的同步處理的容器
Vector-demo1
@Slf4j public class VectorExample1 { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //arraylist是線程不安全的 private static Vector<Integer> list = new Vector<>(); private static void update(int i) { list.add(i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } }
執行結果為5000,但是並不能說是線程安全的,同步容器不能保證在所有的情景下都保證線程安全,可參考Vector-demo2
Vector-demo2
public class VectorExample2 { private static Vector<Integer> vector = new Vector<>(); public static void main(String[] args) { while (true) { for (int i = 0; i < vector.size(); i++) { vector.add(i); } Thread thread1 = new Thread() { public void run() { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } } }; Thread thread2 = new Thread() { public void run() { for (int i = 0; i < 10; i++) { vector.get(i); } } }; thread1.start(); thread2.start(); } } }
輸出如下:
表明某一線程訪問的數據,可能被其他線程remove掉,導致出現下標越界異常,此demo是線程不安全的
HashTable-demo1
@Slf4j public class HashTableExample { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //Hashtable是線程安全的 private static Map<Integer,Integer> map = new Hashtable<>(); private static void update(int i) { map.put(i,i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } }
運行結果為5000,此demo為線程安全的
Collections-List-demo1
@Slf4j public class CollectionsExample1 { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //arraylist是線程不安全的 private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList()); private static void update(int i) { list.add(i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } }
運行結果為5000,是線程安全的
Collections-Set-demo1
@Slf4j public class CollectionsExample2 { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //arraylist是線程不安全的 private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet()); private static void update(int i) { set.add(i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } }
返回結果為5000,是線程安全的
Collections-Map-demo1
@Slf4j public class CollectionsExample3 { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //Hashtable是線程安全的 private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>()); private static void update(int i) { map.put(i,i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } }
返回結果為5000,是線程安全的
並發容器J.U.C
以上是常見的不安全的容器類,對應的並發容器類,我們以demo的方式進行演示
CopyOnWriteArrayList、CopyOnWriteArraySet因為需要copy數組,需要消耗內存,可能引發yonggc胡哦哦這fullgc,並且不能做到實時性,適合讀多寫少的情景
ConcurrentSkipListSet 支持自然排序,並且可以在構造的時候自己定義比較器,可以保證每一次的操作是原子性的,比如add()、remove等,但是對於批量操作,如addAll()等並不能保證原子性(需要自己手動做同步操作,如加鎖等)
ConcurrentHashMap針對讀操作做了大量的優化,這個類具有特別高的並發性,高並發場景下有特別好的表現
ConcurrentSkipListMap與ConcurrentHashMap相比的key是有序的,它支持更高的並發,它的存取時間和線程數是沒有關系的,在一定的數據量下,並發的線程越多ConcurrentSkipListMap越等體現出它的優勢來
CopyOnWriteArrayList-demo
public class CopyOnWriteArrayListExample { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //arraylist是線程不安全的 private static List<Integer> list = new CopyOnWriteArrayList<>(); private static void update(int i) { list.add(i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } }
運行結果為5000,是線程安全的
CopyOnWriteArraySet-demo
@Slf4j public class CopyOnWriteArraySetExample { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //arraylist是線程不安全的 private static Set<Integer> set = new CopyOnWriteArraySet<>(); private static void update(int i) { set.add(i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } }
運行結果為5000,是線程安全的
ConcurrentSkipListSet-demo
@Slf4j public class ConcurrentSkipListSetExample { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //arraylist是線程不安全的 private static Set<Integer> set = new ConcurrentSkipListSet<>(); private static void update(int i) { set.add(i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } }
輸出為5000,是線程安全的
ConcurrentSkipListMap-demo
@Slf4j public class ConcurrentSkipListMapExample { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //HashMap是線程不安全的 private static Map<Integer,Integer> map = new ConcurrentSkipListMap<>(); private static void update(int i) { map.put(i,i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } }
輸出為5000,是線程安全的
ConcurrentHashMap-demo
@Slf4j @ThreadSafe public class ConcurrentHashMapExample { //請求總數 public static int clientTotal = 5000; //同時並發執行的線程數 public static int threadTotal = 200; //HashMap是線程不安全的 private static Map<Integer,Integer> map = new ConcurrentHashMap<>(); private static void update(int i) { map.put(i,i); } public static void main(String[] args)throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量 final Semaphore semaphore = new Semaphore(threadTotal); //定義計數器閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } }
輸出為5000,是線程安全的
並發編程(八):線程安全策略