1. 程式人生 > >並發編程(八):線程安全策略

並發編程(八):線程安全策略

技術分享 同步容器 demo turn 支持 通過 優勢 intercept 計數

通常我們保證線程安全策略的方式有以下幾種:

a、不可變對象

b、線程封閉

c、同步容器

d、並發容器

不可變對象

技術分享圖片

可參考string類,可以采用的方式是將類聲明為final,將所有成員都聲明為私有的,對變量不提供set方法,將所有可變成員聲明為final,通過構造器初始化所有成員,進行深度拷貝,在get方法中不直接返回對象本身,而是返回對象的拷貝。

關於final,我們詳細說明一下

技術分享圖片

final-demo

@Slf4j
public class ImmutableExample1 {

    private final static Integer a = 1;
    
private final static String b = "2"; private final static Map<Integer, Integer> map = Maps.newHashMap(); static { map.put(1, 2); map.put(3, 4); map.put(5, 6); } public static void main(String[] args) { // 被final修飾的基本數據類型無法改變 // a = 2; // b = "2";
// 引用對象,此引用無法指向別的對象,但可修改該對象的值 map.put(1,3); log.info("{}", map.get(1)); } //final可修飾傳遞進來的對象 private void test(final int a) { } }

此demo需要我們註意的是,final修飾引用類型時,雖然不能將引用再指向別的對象,但可修改該對象的值;此外final還可修飾參數,這樣傳遞進來的參數a無法被修改。此demo不是線程安全的

除了final可以定義不可變對象,java提供的Collections類,也可定義不可變對象,Collections.unmodifiableXXX傳入的對象一經初始化便無法修改,XXX可表示Collection、List、Set、Map等,谷歌提供的Guava類,也有類似的功能,ImmutableXXX,XXX同樣可表示Collection、List、Set、Map等

Collections-demo

@Slf4j
public class ImmutableExample2 {
    
    private  static Map<Integer, Integer> map = Maps.newHashMap();

    static {
        map.put(1, 2);
        map.put(3, 4);
        map.put(5, 6);
        //此處理後的map的值是不可以修改的
        map = Collections.unmodifiableMap(map);
    }

    public static void main(String[] args) {

//
        map.put(1,3);
        log.info("{}", map.get(1));
    }
}

輸出如下:

技術分享圖片

可見,用Collections.UnmodifiableMap修飾的對象是不可修改的,如果嘗試修改對象的值,在程序運行時會拋出異常,此方法的實現可參考源碼(其實就是將一個新的集合的所有更新方法變為拋出異常) 此demo是線程安全的。

ImmutableSet-demo

public class ImmutableExample3 {

    //以下為不可變對象的集合
    private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
    private final static ImmutableSet set = ImmutableSet.copyOf(list);

    private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1,2,3,4);
    private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
            .put(1, 2).put(3, 4).put(5, 6).build();

    public static void main(String[] args) {
//        此時不同意再添加新的元素
        map2.put(1, 3);
    }
}

輸出如下:

技術分享圖片

此demo是線程安全的,開發時如果我們的對象可以變為不可變對象,我們盡量將對象變為不可變對象,這樣可以避免線程安全問題

線程封閉

線程封閉就是把對象封裝到一個線程裏,只有一個線程可以看到這個對象,這樣就算這個對象不是線程也不會有線程安全問題

技術分享圖片

關於堆棧封閉,我們自己定義的局部變量被多個線程訪問時,每個局部變量都會被拷貝一份放到線程的棧中去,這樣每個線程操作的對象相當於是不同的,所以不會有線程安全問題(全局變量容易引發並發問題);ThreadLocal使用map實現了線程封閉,map的key是線程id,map的值是封閉的對象

THreadLocal-demo

定義RequestHolder類來操作ThreadLocal:

public class RequestHolder {

    //只有當項目重新啟動的時候,threadLocal中存儲的值才會被釋放
    private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();

    public static void add(Long id) {
        requestHolder.set(id);
    }

    public static Long getId() {
        return requestHolder.get();
    }
    public static void remove() {
        requestHolder.remove();
    }
}

註意,如add方法,只需傳入需要封閉的對象即可,key值會自動取線程id放入,get和remove方法類似

定義HttpFilter類處理請求:

@Slf4j
public class HttpFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {

        HttpServletRequest request = (HttpServletRequest) servletRequest;
        log.info("do filter,{},{}", Thread.currentThread().getId(), request.getServletPath());
        RequestHolder.add(Thread.currentThread().getId());
        //使請求繼續被處理,不要攔住不動
        filterChain.doFilter(servletRequest, servletResponse);
    }

    @Override
    public void destroy() {

    }
}

配置啟動類:

@SpringBootApplication
public class ConcurrencyApplication extends WebMvcConfigurerAdapter {

    public static void main(String[] args) {
        SpringApplication.run(ConcurrencyApplication.class, args);
    }

    @Bean
    public FilterRegistrationBean httpFilter() {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean();
        registrationBean.setFilter(new HttpFilter());
        registrationBean.addUrlPatterns("/threadLocal/*");
        return registrationBean;
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/*");
    }
}

攔截以threadLocal開頭的url,並利用Interceptor攔截所有的接口

定義Interceptor

@Slf4j
public class HttpInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        log.info("preHandle");
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        RequestHolder.remove();
        log.info("afterCompletion");
        return;
    }
}

此Interceptor的作用是當接口處理後,移除ThreadLocal中對應的值

定義Controller來進行驗證

@Controller
@RequestMapping("/threadLocal")
public class ThreadLocalController {

    @RequestMapping("/test")
    @ResponseBody
    public Long test() {
        return RequestHolder.getId();
    }
}

同步容器

技術分享圖片

我們都知道ArrayList、HashMap等為線程不安全的,上圖標識了它們對應的同步處理的容器

Vector-demo1

@Slf4j
public class VectorExample1 {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //arraylist是線程不安全的
    private static Vector<Integer> list = new Vector<>();


    private  static void update(int i) {
        list.add(i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());

    }
}

執行結果為5000,但是並不能說是線程安全的,同步容器不能保證在所有的情景下都保證線程安全,可參考Vector-demo2

Vector-demo2

public class VectorExample2 {

    private static Vector<Integer> vector = new Vector<>();


    public static void main(String[] args) {


        while (true) {
            for (int i = 0; i < vector.size(); i++) {
                vector.add(i);
            }

            Thread thread1 = new Thread() {
                public void run() {
                    for (int i = 0; i < vector.size(); i++) {
                        vector.remove(i);
                    }
                }
            };

            Thread thread2 = new Thread() {
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        vector.get(i);
                    }
                }
            };

            thread1.start();
            thread2.start();
        }

    }

}

輸出如下:

技術分享圖片

表明某一線程訪問的數據,可能被其他線程remove掉,導致出現下標越界異常,此demo是線程不安全的

HashTable-demo1

@Slf4j
public class HashTableExample {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //Hashtable是線程安全的
    private static Map<Integer,Integer> map = new Hashtable<>();


    private  static void update(int i) {
        map.put(i,i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());

    }
}

運行結果為5000,此demo為線程安全的

Collections-List-demo1

@Slf4j
public class CollectionsExample1 {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //arraylist是線程不安全的
    private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());


    private  static void update(int i) {
        list.add(i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());

    }
}

運行結果為5000,是線程安全的

Collections-Set-demo1

@Slf4j
public class CollectionsExample2 {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //arraylist是線程不安全的
    private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());


    private  static void update(int i) {
        set.add(i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());

    }
}

返回結果為5000,是線程安全的

Collections-Map-demo1

@Slf4j
public class CollectionsExample3 {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //Hashtable是線程安全的
    private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());


    private  static void update(int i) {
        map.put(i,i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());

    }
}

返回結果為5000,是線程安全的

並發容器J.U.C

技術分享圖片

以上是常見的不安全的容器類,對應的並發容器類,我們以demo的方式進行演示

CopyOnWriteArrayList、CopyOnWriteArraySet因為需要copy數組,需要消耗內存,可能引發yonggc胡哦哦這fullgc,並且不能做到實時性,適合讀多寫少的情景

ConcurrentSkipListSet 支持自然排序,並且可以在構造的時候自己定義比較器,可以保證每一次的操作是原子性的,比如add()、remove等,但是對於批量操作,如addAll()等並不能保證原子性(需要自己手動做同步操作,如加鎖等)

ConcurrentHashMap針對讀操作做了大量的優化,這個類具有特別高的並發性,高並發場景下有特別好的表現

ConcurrentSkipListMap與ConcurrentHashMap相比的key是有序的,它支持更高的並發,它的存取時間和線程數是沒有關系的,在一定的數據量下,並發的線程越多ConcurrentSkipListMap越等體現出它的優勢來

CopyOnWriteArrayList-demo

public class CopyOnWriteArrayListExample {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //arraylist是線程不安全的
    private static List<Integer> list = new CopyOnWriteArrayList<>();


    private  static void update(int i) {
        list.add(i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());

    }
}

運行結果為5000,是線程安全的

CopyOnWriteArraySet-demo

@Slf4j
public class CopyOnWriteArraySetExample {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //arraylist是線程不安全的
    private static Set<Integer> set = new CopyOnWriteArraySet<>();


    private  static void update(int i) {
        set.add(i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());

    }
}

運行結果為5000,是線程安全的

ConcurrentSkipListSet-demo

@Slf4j
public class ConcurrentSkipListSetExample {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //arraylist是線程不安全的
    private static Set<Integer> set = new ConcurrentSkipListSet<>();


    private  static void update(int i) {
        set.add(i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());

    }
}

輸出為5000,是線程安全的

ConcurrentSkipListMap-demo

@Slf4j
public class ConcurrentSkipListMapExample {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //HashMap是線程不安全的
    private static Map<Integer,Integer> map = new ConcurrentSkipListMap<>();


    private  static void update(int i) {
        map.put(i,i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());

    }
}

輸出為5000,是線程安全的

ConcurrentHashMap-demo

@Slf4j
@ThreadSafe
public class ConcurrentHashMapExample {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    //HashMap是線程不安全的
    private static Map<Integer,Integer> map = new ConcurrentHashMap<>();


    private  static void update(int i) {
        map.put(i,i);
    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {

            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());

    }
}

輸出為5000,是線程安全的

並發編程(八):線程安全策略