1. 程式人生 > >拜託!不要再問我是否瞭解多執行緒了好嗎

拜託!不要再問我是否瞭解多執行緒了好嗎

  面試過程中,各面試官一般都會教科書式的問你幾個多執行緒的問題,但又不知從何問起。於是就來一句,你瞭解多執行緒嗎?拜託,這個有好傷自尊的!

  相信老司機們對於java的多執行緒問題處理,穩如老狗了。你問我瞭解不?都懶得理你。

  不過,既然是面對的是面試官,那你還得一一說來。

  今天我們就從多個角度來領略下多執行緒技術吧!

1. 為什麼會有多執行緒?

  其實有的語言是沒有多執行緒的概念的,而java則是從一出生便有了多執行緒天賦。為什麼?
  多執行緒技術一般又被叫做併發程式設計,目的是為了程式執行得更快。
  其基本原理是,是由cpu進行不同執行緒的排程,從而實現多個執行緒的同時執行效果。
  多程序和多執行緒類似,只是多程序不會共享記憶體資源,切換開銷更大,所以多執行緒是更明智的選擇。
  而在計算機出現早期,或者也許你也能找到單核的cpu,這時候的多執行緒是通過不停地切換唯一一個可以執行的執行緒來實現的,由於切換速度比較快,所以感覺就是多執行緒同時在運行了。在這種情況下,多執行緒與多程序等同的。但是,至少也讓使用者有了可以同時處理多工的能力了,也是很有用的。
  而當下的多核cpu時代,則是真正可以同時執行多個執行緒的時代,什麼四核八執行緒,八核八執行緒.... 意味著可以同時並行n個執行緒。如果我們能讓所有可用的執行緒都利用起來,那麼我們的程式執行速度或者說整體效能將會得到極大提升。這是我們技術人員的目標。

 

2. 多執行緒就一定快嗎?(簡略)

  看起來,多執行緒確實挺好,但是凡事皆有度。過尤不及。

  如果只執行與cpu能力範圍內的n執行緒,那是絕對ok的。但當你執行緒數超過這個n時,就會涉及到cpu的排程問題,排程時即會涉及一個上下文切換問題,這是要耗費時間和資源的東西。當cpu疲於奔命排程切換時,則多執行緒就是一個負擔了。

 

3. 多執行緒主要注意什麼問題?(簡略)

  多執行緒要注意的問題多了去了,畢竟這是一門不簡單的學問,但是我們也可以總結下:

  1. 執行緒安全性問題;如果連正確性都無法保障,談效能有何意義?
  2. 資源隔離問題;是你就是你的,不是你的就不是你的。
  3. 可讀性問題;如果為了多執行緒,將程式碼搞得一團糟,是否值得?
  4. 外部環境問題;如果外部環境很糟糕,那麼你內部效能再好,你能把壓力給外部嗎?

 

4. 建立多執行緒的方式?(簡略)

  這個問題確實有點low, 不過也是一個體現真實實踐的地方!

  1. 繼承Thread類,然後 new MyThread.start();
  2. 繼承Runnable類, 然後 new Thread(runnable).start();
  3. 繼承Callable類,然後使用 ExecutorService.submit(callable);
  4. 使用執行緒池技術,直接建立n個執行緒,將上面的方法再來一遍,new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 簡化版: Executors.newFixedThreadPool(n).submit(runnable);

 

5. 來點實際的場景?(重點)

  理論始終太枯燥,不如來點實際的。

  有同學說,我平時就寫寫業務程式碼,而業務程式碼基本由使用者觸發,一條執行緒走到底,哪來的多執行緒實踐?

  好,我們可以就這個問題來說下,這種業務的多執行緒:

  1. 比如一個http請求,對應一個響應,如果不使用多執行緒,會怎麼樣?我們可以簡單地寫一個socket伺服器,進行處理業務,但是這絕對不是你想看到的。比如我們常用的 spring+tomcat, 哪裡沒有用到多執行緒技術?

        http-nio-8080-exec-xxx #就是一個執行緒池中例子。

  2. 任何一個java應用,啟動起來之後,都會有很多的GC執行緒執行,這難道不是多執行緒?如:

        "G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007fb91008f000 nid=0x40e7 runnabl
        "Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007fb910061800 nid=0x40de runnable

  如上這些多執行緒場景吧,面試官說,就算你瞭解其原理,那也不算是你的。你有真正使用過多執行緒嗎?

  接下來,我們就來說道說道,實際業務場景中,有哪些是我們可能會用上的,供大家參考:

看下多執行緒中幾個有趣或者經典的場景用法!

  場景1. 我有一個發郵件的功能,使用者操作成功後,我給他傳送郵件,如何高效穩定地完成?

  場景2. 我有m個執行緒在迴圈執行主方法,為實現高效處理,將分離n*m個子執行緒執行相關聯流程,要求子執行緒必須等到主執行緒執行完成後才能執行,如何保證?

  場景3. 某合作公司要求請求其api的qps不得大於n,如何保證?

  場景4. 一個大任務如何提高響應速度?

  場景5. 我有n個執行緒同時開始處理一個事務,要求至少等到一個執行緒執行完畢後,才能進行響應返回,如何高效處理?

  場景6. 抽象任務,後臺執行處理任務多執行緒?

 

大家應該已經見過世面了,這點問題還不至於,對吧。那你可以拿出你的方案了。


下面是我的解決方案:

 

場景1. 我有一個發郵件的功能,使用者操作成功後,我給他傳送郵件,如何高效穩定地完成?
場景1解決:(常規型)

  這個可以說最實用最簡單的多執行緒應用場景了,不過現在進行微服務化之後,可能會有一些不同。換湯不換藥。

  針對C端使用者的多執行緒,我們是不建議使用 new Thread() 這種方式的,執行緒池是個常用伎倆。

    ExecutorService mailExecutors = Executors.newFixedThreadPool(20);

    public void sendMail() {
        mailExecutors.submit(() -> {
            // do send mail biz, http, rpc,...
            System.out.println("sending mail");
        });
    }

 

場景2. 我有m個執行緒在迴圈執行主方法,為實現高效處理,將分離n*m個子執行緒執行相關聯流程,要求子執行緒必須等到主執行緒執行完成後才能執行,如何保證?
場景2解決:(所有等待型)

  主任務,只管排程子執行緒,在子執行緒使用閉鎖在適當的地方進行等待,主執行緒迴圈分配完成後,開啟閉鎖,放行所有子執行緒即可。

  具體程式碼如下:

    private void mainWork() {
        try {
            resetRedisZsetLockGate();
            for (String linkTraceCacheKey : expiredKeys) {
                subWork(linkTraceCacheKey);
            }
        }
        finally {
            releaseRedisZsetLock();
        }
    }
    
    private void subWork(String linkTraceCacheKey) {
        deleteService.execute(new Runnable() {
            @Override
            public void run() {
                // do other biz
                blockingWaitRedisZsetLock();
                postSth(linkTraceCacheKey);
            }
        });
    }
    
    /**
     * 重置鎖閘道器,每次主方法的排程都將得到一個私有的鎖
     */
    private void resetRedisZsetLockGate() {
        redisZsetScanLockGate = new CountDownLatch(1);
    }
    
    /**
     * 阻塞等待 鎖
     */
    private void blockingWaitRedisZsetLock() {
        final CountDownLatch myGate = redisZsetScanLockGate;
        try {
            myGate.await();
        } 
        catch (InterruptedException e) {
            logger.error("等待鎖中斷異常", e);
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 釋放鎖
     */
    private void releaseRedisZsetLock() {
        final CountDownLatch myGate = redisZsetScanLockGate;
        myGate.countDown();
    }

 

場景3. 某合作公司要求請求其api的qps不得大於n,如何保證?
場景3解決:(流量控制型、有限資源型)

  這種問題準確的說,使用單機的多執行緒還是有點難控制的,但是我們只是為了講清道理,具體(叢集)做法只要稍做變通即可。

  簡單點說,就是作用一個 Semphore 訊號量進行數量控制,當數量未到時,直接多執行緒併發請求,到達限制後,則等待有空閒位置再進行!

public class AbstractConcurrentSimpleLiteJobBase {
    /**
     * 併發查詢:5 , 動態配置化
     */
    private final Semaphore maxConcurrentQueryLock;

    /**
     * 同步等待結束鎖,視情況使用,同一個執行緒可能提交多次任務,由同一個 holder 管理
     */
    private final ThreadLocal<List<Future<?>>> endGateTaskFutureContainer = new ThreadLocal<>();

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public AbstractConcurrentSimpleLiteJobBase() {
        maxConcurrentQueryLock = new Semaphore(getMaxConcurrentThreadNum());
    }

    /**
     * 獲取最大允許的併發數,子類可自定義, 預設:5
     *
     * @return 最大併發數
     */
    protected int getMaxConcurrentThreadNum() {
        return 5;
    }
    
    /**
     * 提交一個任務到執行緒池執行
     *
     * @param task 任務
     */
    protected void submitTask(Runnable task) {
        // 考慮是否要阻塞等待結果
        Future<?> future1 =  threadPoolTaskExecutor.submit(() -> {
            try {
                maxConcurrentQueryLock.acquire();
            }
            catch (InterruptedException ie) {
                // ignore...
                log.error("【任務執行】異常,中斷", ie);
                Thread.currentThread().interrupt();
                return;
            }
            try {
                task.run();
            }
            finally {
                maxConcurrentQueryLock.release();
            }
        });
        endGateCountDown(future1);
    }
    
    /**
     * 等待執行緒結果完成,並清理 gate 資訊
     */
    private void awaitForComplete() {
        try {
            // 同步等待執行完成,防止併發任務執行
            for(Future<?> future1 : endGateTaskFutureContainer.get()) {
                future1.get();
            }
            endGateTaskFutureContainer.remove();
        }
        catch (ExecutionException e) {
            log.error("【任務執行】異常,丟擲異常", e);
        }
        catch (InterruptedException e) {
            log.error("【任務執行】異常,中斷", e);
        }
    }


}

 

場景4. 一個大任務如何提高響應速度?
場景4解決:(大任務拆分型)

  針對大任務的處理,基本想到的都是類似於分散式計算之類的東西(map/reduce),在java單機操作來說,標準的解決方案是 Fork/Join 框架。

public class MyForkJoinTask extends RecursiveTask<Integer> {
    //原始資料
    private List<Integer> records;

    public MyForkJoinTask(List<Integer> records) {
        this.records = records;
    }

    @Override
    protected Integer compute() {
        //任務拆分到可接受程度後,執行處理邏輯
        if (records.size() < 3) {
            return doRealCompute();
        }
        // 否則一直往下拆分任務
        int size = records.size();
        MyForkJoinTask aTask = new MyForkJoinTask(records.subList(0, size / 2));
        MyForkJoinTask bTask = new MyForkJoinTask(records.subList(size / 2, records.size()));
        //兩個任務併發執行
        invokeAll(aTask, bTask);
        //結果合併
        return aTask.join() + bTask.join();
    }

    /**
     * 真正任務處理邏輯
     */
    private int doRealCompute() {
        try {
            Thread.sleep((long) (records.size() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("計算任務:" + Arrays.toString(records.toArray()));
        return records.size();
    }

    // 測試任務
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(5);
        List<Integer> originalData = new ArrayList<>();
        originalData.add(1);
        originalData.add(2);
        originalData.add(3);
        originalData.add(4);
        originalData.add(5);
        originalData.add(6);
        originalData.add(7);
        originalData.add(8);
        originalData.add(9);
        originalData.add(10);
        originalData.add(11);
        originalData.add(12);
        originalData.add(13);

        MyForkJoinTask myForkJoinTask = new MyForkJoinTask(originalData);
        long t1 = System.currentTimeMillis();
        ForkJoinTask<Integer> affectNums = forkJoinPool.submit(myForkJoinTask);
        System.out.println("affect nums: " + affectNums.get());
        long t2 = System.currentTimeMillis();
        System.out.println("cost time: " + (t2-t1));
    }
}

  其實如果不用Fork/join 框架,也是可以的,比如我就只開n個線依次從資料來源處取資料進行處理,最後將結果合併到另一個佇列中。只是,這期間你得多付出多少努力才能做到 Fork/Join 相同的效果呢!

  當然了,Fork/Join 的重要特性是: 使用了work-stealing演算法。Worker執行緒跑完任務後,可以從其他還在忙著的執行緒去竊取任務。

  你要願意造輪子,也是可以的。

 

場景5. 我有n個執行緒同時開始處理一個事務,要求至少等到一個執行緒執行完畢後,才能進行響應返回,如何高效處理?
場景5解決:(至少一個返回型)

  初步思路: 主任務中,使用一個閉鎖,CountDownLatch(1); 所有子執行緒執行完成,呼叫 latch.countDown(); 開啟一次閉鎖。主任務執行完成後,呼叫 latch.await(); 阻塞等待,當有任意一個子執行緒開啟閉鎖後,就可以返回了。

  但是這個是有問題的,即這個鎖只會有一次生效機會,後續的完成動作並不會有實際意義,因此只能換一個方式。

  使用回撥實現,就容易多了,只要一個任務完成,就做一次回撥,主任務如果分配完成後,發現有空閒的任務槽,就立即進行下一次分配即可,沒有則等到有再進行分配工作。

  具體程式碼如下:

public class TaskDispatcher {
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting assign */
    private final Condition finishedTaskNotEmpty;

    /**
     * 正在執行的任務計數器
     */
    private final AtomicInteger runningTaskCounter = new AtomicInteger(0);

    /**
     * 新完成的任務計數器,當被重新分派後,此計數將會被置0
     */
    private Integer newFinishedTaskCounter = 0;
    
    private void consumLogHub(String shards) throws InterruptedException {
        resetConsumeCounter();
        String[] shardList = shards.split(",");
        for (int i = 0; i < shardList.length; i++) {
            String shard = shardList[i];
            int shardId = Integer.parseInt(shard);
            LogHubConsumer consuemr = getConsuemer(shardId);
            if(consuemr.startNewConsumeTask(this)) {
                runningTaskCounter.incrementAndGet();
            }
        }
        cleanConsumer(Arrays.asList(shardList));
        // 沒有一個任務已完成,阻塞等待一個完成
        if(runningTaskCounter.get() > 0) {
            if(newFinishedTaskCounter == 0) {
                waitAtLeastOnceTaskFinish();
            }
        }
    }
    
    /**
     * 重置消費者計數器
     */
    private void resetConsumeCounter() {
        newFinishedTaskCounter = 0;
    }

    /**
     * 阻塞等待至少一個任務執行完成
     *
     * @throws InterruptedException 中斷
     */
    private void waitAtLeastOnceTaskFinish() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (newFinishedTaskCounter == 0) {
                finishedTaskNotEmpty.await();
            }
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * 通知任務完成(回撥)
     *
     * @throws InterruptedException 中斷
     */
    private void notifyTaskFinished() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            runningTaskCounter.decrementAndGet();
            // 此處計數不可能小於0
            newFinishedTaskCounter += 1;
            finishedTaskNotEmpty.signal();
        }
        finally {
            lock.unlock();
        }
    }
    /**
     * 通知任務完成(回撥)
     *
     * @throws InterruptedException 中斷
     */
    public void taskFinishCallback() throws InterruptedException {
        notifyTaskFinished();
    }
    
}

public class ConsumerWorker {

    private Future<?> future;
    
    @Resource
    private ExecutorService consumerService; 
    
    /**
     * 當查詢結果為時的等待延時, 每次查詢結果都會為空時,加大該延時, 直到達到設定的最大值為準
     */
    private Long baseEmptyQueryDelayMills = 200L;
    private Long emptyQueryDelayMills = baseEmptyQueryDelayMills;

    /**
     * 調置最大延時為1秒
     */
    private static final Long maxEmptyQueryDelayMills = 1000L;

    /**
     * 記數
     */
    private void encounterEmptyQueryDelay() {
        if(emptyQueryDelayMills < maxEmptyQueryDelayMills) {
            emptyQueryDelayMills += 100L;
        }
    }

    private void resetEmptyQueryDelay() {
        emptyQueryDelayMills = baseEmptyQueryDelayMills;
    }


    // 開啟一個消費者執行緒
    public boolean startNewConsumeTask(LogHubClientWork callback) {
        if(future==null || future.isCancelled() || future.isDone()) {
            //沒有任務或者任務已取消或已完成 提交任務
            future = consumerService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Integer dealCount = doBizData();
                        if(dealCount == 0) {
                            SleepUtil.millis(emptyQueryDelayMills);
                            encounterEmptyQueryDelay();
                        }
                        else {
                            resetEmptyQueryDelay();
                        }
                    }
                    finally {
                        try {
                            callback.taskFinishCallback();
                        }
                        catch (InterruptedException e) {
                            logger.error("處理完成通知失敗,中斷", e);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            });
            return true;
        }
        return false;
    }
    
}

 


場景6. 抽象任務,後臺執行處理任務多執行緒?
場景6解決:(業務相關類)

  最簡單也是最難的一種,根據具體業務型別做相應處理就好,主要考慮讀寫的安全性問題。

 

  如上幾個多執行緒的應用場景,是我在工作中切實用上的場景(所言非虛)。不過它們都有一個特點,即任務都是很獨立的,即基本上不用太關心執行緒安全問題,這也是我們編寫多執行緒程式碼時儘量要做的事。當然很多場景共享資料是一定的,這時候就更要注意執行緒安全了。

  要做到執行緒安全也不是難事,比如足夠好的封裝,可以讓你把關注點鎖定在很小的範圍內。

  當然,為了執行緒安全,我們可能往往又會犧牲效能,這就看我們如何把握這些度了!互斥鎖是最容易使用的鎖,但是也是效能最差的鎖。分段鎖能夠解決鎖效能問題,但是又會給編寫帶來更大的困難。

 

  多執行緒,不止要會寫,還要會給自己填坑。

 

嘮叨: 去追天邊的那束光!

&n