前言

遙想當年大二,實習面試的時候,面試官一個問題:作業系統最小的排程單元是什麼?當時還沒學過作業系統,只知道程序的概念,於是乎信心滿滿的答道,當然是程序啊,然後......就沒有然後了。

之後再看這個問題,其實就是一個笑話。作業系統排程的最小單元其實是執行緒。現在想想當時,自己大二就敢跑出去實習也是服了自己。

zookeeper看得頭大,雖然看懂了一點,但是實在不敢寫總結,因為也無從寫起,paxos演算法和ZAB協議想總結清楚,現在火候還不夠。所以打算暫時放一放zookeeper,先學習併發程式設計部分。

何時使用多執行緒

阻塞

可能這個詞經常聽,但是真正的要解釋清楚還是有難度的。阻塞:程式在執行到某一個函式或者過程需要等待某些事情完成才能進行後面的操作,在等待期間會暫時停止對CPU的佔用情況,這個時候CPU會閒置

多執行緒的目的就是為了高效率的利用CPU,減少CPU的閒置時間。回到主要的問題——什麼情況下該使用多執行緒,個人覺得有以下幾點:

1、通過平行計算提高程式執行效能。

2、等待網路的部分。

3、I/O響應導致耗費大量的執行時間

等等等......

這些概念只是簡單瞭解一下,後面如果涉及到BIO,NIO等概念可以結合這裡看看。

如何啟動執行緒

這個問題真的很老了,在進新網實習面試的時候就被問到了這個問題。這個問題很多人會回答兩種,但是在Java中不止兩種。

通常的有以下幾種

1、繼承Thread類

這個啟動執行緒不需要包裝,直接呼叫Thread型別的物件的start方法就可以啟動執行緒(不是run),start是一個native方法

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("MyThread "+Thread.currentThread().getId()+" is running");
    }
}

2、實現Runnable介面

與繼承Thread類差不多,但是再啟動執行緒的時候,需要包裝一下

public class MyRunnable implements Runnable {
    public void run() {
        System.out.println("Thread "+Thread.currentThread().getId() +" is running");
    }
}

3、實現Callable介面

這種方式實現的執行緒,可以有返回值。這種方式在啟動執行緒的時候需要將其交給ExecutorService(這個類常用於構建執行緒池)去啟動。


/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      通過FutureTask包裝器來建立Thread執行緒
 */
public class CallableThread implements Callable<String> {
    public String call() throws Exception {
        int a = 1;
        int b = 2;
        System.out.println(a+b);
        /**
         * 如果處理事件過長,客戶端會阻塞
         * TimeUnit.SECONDS.sleep(20);
         */
        return "執行結果:"+(a+b);
    }
}

測試程式碼:

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 */
public class ThreadDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyThread thread01= new MyThread();
        MyThread thread02 = new MyThread();
        thread01.start();
        thread02.start();

        MyRunnable runnable01 = new MyRunnable();
        MyRunnable runnable02 = new MyRunnable();

        //Runnable介面,需要通過Thread包裝一下
        Thread RunThread01 = new Thread(runnable01);
        Thread RunThread02 = new Thread(runnable02);
        RunThread01.start();
        RunThread02.start();

        //獲得Callable介面的返回結果
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        CallableThread callableThread = new CallableThread();
        Future<String> future = executorService.submit(callableThread);

        //客戶端真正阻塞在這裡,獲取執行緒返回資料的時候,如果執行緒沒有將資料準備好,這裡會阻塞。
        System.out.println(future.get());
        executorService.shutdown();
    }
}

玩玩責任鏈

合理的利用非同步操作,可以提升程式處理效能,通過阻塞佇列以及多執行緒的方式,實現對請求的非同步化處理可以很好的提高程式效能,這個在zookeeper原始碼中是有提現的(奈何zookeeper現在依舊沒看懂)

這裡先需要了解幾種阻塞佇列的功能——LinkedBlockingQueue與ArrayBlockingQueue

請求的簡單描述例項

package com.learn.ProcessorChain;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      請求的簡單描述
 */
public class Request {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Request{" +
                "name='" + name + '\'' +
                '}';
    }
}

處理請求的標記介面,對請求的不同處理類都需要實現這個介面

package com.learn.ProcessorChain;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      處理請求的標記介面
 */
public interface RequestProcessor {

    void processRequest(Request request);

}

簡單的列印處理

這裡就用到了LinkedBlockingQueue,這個佇列的take方法會阻塞,在沒有獲取到元素的時候會一直阻塞,直到獲取到元素為止。

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 */
public class PrintProcessor extends Thread implements RequestProcessor {

    /**
     * 業務邏輯鏈
     * 這裡用阻塞佇列的原因,就是在沒有任務的時候,take操作會阻塞,如果有元素了,take操作會被喚醒
     */
    LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();

    //下一個任務
    private final RequestProcessor nextProcessor;

    public PrintProcessor(RequestProcessor nextProcessor){
        this.nextProcessor = nextProcessor;
    }

    @Override
    public void run() {
        while(true){
            try{
                //佇列中沒有元素會在這裡阻塞
                Request request = requests.take();
                //這裡是處理當前任務
                System.out.println("begin print request data : "+request.getName());

                //處理下一個任務
                nextProcessor.processRequest(request);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }

    //處理請求
    public void processRequest(Request request) {
        //先將請求放到任務鏈中,後面統一處理
        requests.add(request);
    }
}

簡單的儲存處理

package com.learn.ProcessorChain;

import java.util.concurrent.LinkedBlockingQueue;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 */
public class SaveProcessor extends Thread implements RequestProcessor {

    LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();

    public void processRequest(Request request) {
        requests.add(request);
    }

    @Override
    public void run() {
        while(true){
            try{
                Request request = requests.take();
                System.out.println("begin save request info :"+request);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

測試主類:

package com.learn.ProcessorChain;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      鏈式處理任務
 */
public class RequestChainDemo {

    PrintProcessor printProcessor;

    protected RequestChainDemo(){
        SaveProcessor saveProcessor =new SaveProcessor();
        saveProcessor.start();

        printProcessor = new PrintProcessor(saveProcessor);
        printProcessor.start();
    }

    private void doTest(Request request){
        printProcessor.processRequest(request);
    }

    public static void main(String[] args) {
        Request request = new Request();
        request.setName("Liman");
        new RequestChainDemo().doTest(request);
    }
}

說明:在物件初始化的時候,就已經設定的責任鏈,並且啟動了兩個執行緒,但是這兩個處理執行緒目前沒有待處理的請求,這個兩個執行緒就一直阻塞,在printProcessor真正接受請求的時候開始,執行緒才開始真正的處理業務邏輯。

併發程式設計基礎

前面的都是針對之前自己的認識做一個簡單的回顧,這裡才是正在的正文開始。

執行緒狀態

執行緒才是作業系統排程的最小單元。執行緒的排程並沒有想象中的靈活,這很容易造成問題。

通過檢視Thread原始碼可以看到常說的執行緒狀態有6種(NEW,RUNNABLE,BLOCKED,WAITING,TIME_WAITING,TERMINATED),簡單一張圖就可以總結,這個其實網上有很多了。

 說明:其中的RUNNABLE為了更好的與實際貼合,拆分成了兩種狀態,一種是ready,另一種是running

檢視行程的示例程式碼:

package com.learn.ThreadStatus;

import java.util.concurrent.TimeUnit;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      執行緒的狀態
 */
public class ThreadStatus {
    public static void main(String[] args) {
        //TIME-WAITING
        new Thread(()->{
            while(true){
                try{
                    TimeUnit.SECONDS.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        },"time_waiting").start();

        //WAITING
        new Thread(()->{
            while(true){
                synchronized (ThreadStatus.class){
                    try{
                        ThreadStatus.class.wait();
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        },"waiting").start();

        //執行緒在ThreadStatus加鎖後,不會釋放鎖
        new Thread(new BlockedDemo(),"BlockDemo-01").start();
        new Thread(new BlockedDemo(),"BlockDemo-02").start();
    }

    static class BlockedDemo extends Thread{
        @Override
        public void run() {
            synchronized (BlockedDemo.class){
                while(true){
                    try{
                        TimeUnit.SECONDS.sleep(1000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

 可以呼叫jps檢視虛擬就中的pid,然後利用jstack根據pid檢視各個執行緒狀態。

執行緒的停止

記得之前《Java多執行緒核心程式設計技術》一書中介紹了三種執行緒停止的方式(算上stop),這裡不總結stop方式。

1、通過interrupt標記位停止執行緒

package com.learn.ThreadStop;

import java.util.concurrent.TimeUnit;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      通過interrupt方法設定標誌位,停止執行緒
 *      Thread.interrupt會重置標誌位
 */
public class InterruptDemo {

    private static int i;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(()->{
            while(!Thread.currentThread().isInterrupted()){
                i++;
            }
            Thread.interrupted();//標誌位復位
            System.out.println("內部:"+Thread.currentThread().isInterrupted());
            System.out.println("Num:"+i);
        },"interruptDemo");

        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt();
        System.out.println(thread.isInterrupted());

        Thread thread02 = new Thread(()->{
            while(true){
                boolean ii = Thread.currentThread().isInterrupted();
                if(ii){
                    System.out.println("before : "+Thread.currentThread().getName()+" "+ii);
                    Thread.interrupted();//對執行緒進行復位,中斷標誌為false
                    System.out.println("after : "+Thread.currentThread().getName()+" "+Thread.currentThread().isInterrupted());
                }
            }
        },"Thread02");
        thread02.start();
        TimeUnit.SECONDS.sleep(1);
        thread02.interrupt();
    }
}

上述的程式碼可以做相應的刪減,明確thread的interrupt的使用方式

2、利用volatile變數停止執行緒

volatile修飾的變數會讓該變數對所有的執行緒可見,這個欄位也會用線上程通訊。但是比較影響效能

package com.learn.ThreadStop;

/**
 * author:liman
 * createtime:2018/9/17
 * mobile:15528212893
 * email:[email protected]
 * comment:
 *      通過volatile欄位中止執行緒
 */
public class VolatileInterruptDemo {

    //volatile欄位讓該變數對所有的執行緒可見
    private volatile static boolean stop = false;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(()->{
            int i = 0;
            while(!stop){
                i++;
            }
            System.out.println(i);
        });
        thread.start();
        System.out.println("begin start thread");
        Thread.sleep(1000);
        stop = true;//主執行緒中將stop標記設定為true之後,子執行緒就會停止
    }

}

執行緒的安全問題

執行緒的安全問題可以總結為可見性、原子性和有序性三個問題。

可見性:當一個物件在多個記憶體中都存在副本時,如果一個記憶體修改了共享變數,其它執行緒也應該能夠看到被修改後的值,此為可見性

一個運算賦值操作並不是一個原子性操作,多個執行緒執行時,CPU對執行緒的排程是隨機的,我們不知道當前程式被執行到哪步就切換到了下一個執行緒,一個最經典的例子就是銀行匯款問題,一個銀行賬戶存款100,這時一個人從該賬戶取10元,同時另一個人向該賬戶匯10元,那麼餘額應該還是100。那麼此時可能發生這種情況,A執行緒負責取款,B執行緒負責匯款,A從主記憶體讀到100,B從主記憶體讀到100,A執行減10操作,並將資料重新整理到主記憶體,這時主記憶體資料100-10=90,而B記憶體執行加10操作,並將資料重新整理到主記憶體,最後主記憶體資料100+10=110,顯然這是一個嚴重的問題,我們要保證A執行緒和B執行緒有序執行,先取款後匯款或者先匯款後取款,此為有序性。上述描述其實也已經包含了原子性,針對餘額的操作需要做成原子性。

執行緒安全問題,大致可以分為上述三個問題,這三個問題在不同硬體層面有著不同的解決方案,Java為了遮蔽不同硬體的差別,提出了JMM的抽象,針對JMM的抽象會在下一篇部落格中進行描述。