首先給大家分享一個github倉庫,上面放了200多本經典的計算機書籍,包括C語言、C++、Java、Python、前端、資料庫、作業系統、計算機網路、資料結構和演算法、機器學習、程式設計人生等,可以star一下,下次找書直接在上面搜尋,倉庫持續更新中~

github地址:https://github.com/Tyson0314/java-books

如果github訪問不了,可以訪問gitee倉庫。

gitee地址:https://gitee.com/tysondai/java-books

執行緒池

使用執行緒池的好處

  • 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性。統一管理執行緒,避免系統建立大量同類執行緒而導致消耗完記憶體。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

執行緒池原理

建立新的執行緒需要獲取全域性鎖,通過這種設計可以儘量避免獲取全域性鎖,當 ThreadPoolExecutor 完成預熱之後(當前執行的執行緒數大於等於 corePoolSize),提交的大部分任務都會被放到 BlockingQueue。

ThreadPoolExecutor 的通用建構函式:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
  • corePoolSize:當有新任務時,如果執行緒池中執行緒數沒有達到執行緒池的基本大小,則會建立新的執行緒執行任務,否則將任務放入阻塞佇列。當執行緒池中存活的執行緒數總是大於 corePoolSize 時,應該考慮調大 corePoolSize。

  • maximumPoolSize:當阻塞佇列填滿時,如果執行緒池中執行緒數沒有超過最大執行緒數,則會建立新的執行緒執行任務。否則根據拒絕策略處理新任務。非核心執行緒類似於臨時借來的資源,這些執行緒在空閒時間超過 keepAliveTime 之後,就應該退出,避免資源浪費。

  • BlockingQueue:儲存等待執行的任務。

  • keepAliveTime:非核心執行緒空閒後,保持存活的時間,此引數只對非核心執行緒有效。設定為0,表示多餘的空閒執行緒會被立即終止。

  • TimeUnit:時間單位

    TimeUnit.DAYS
    TimeUnit.HOURS
    TimeUnit.MINUTES
    TimeUnit.SECONDS
    TimeUnit.MILLISECONDS
    TimeUnit.MICROSECONDS
    TimeUnit.NANOSECONDS
  • ThreadFactory:每當執行緒池建立一個新的執行緒時,都是通過執行緒工廠方法來完成的。在 ThreadFactory 中只定義了一個方法 newThread,每當執行緒池需要建立新執行緒就會呼叫它。

    public class MyThreadFactory implements ThreadFactory {
    private final String poolName; public MyThreadFactory(String poolName) {
    this.poolName = poolName;
    } public Thread newThread(Runnable runnable) {
    return new MyAppThread(runnable, poolName);//將執行緒池名字傳遞給建構函式,用於區分不同執行緒池的執行緒
    }
    }
  • RejectedExecutionHandler:當佇列和執行緒池都滿了時,根據拒絕策略處理新任務。

    AbortPolicy:預設的策略,直接丟擲RejectedExecutionException
    DiscardPolicy:不處理,直接丟棄
    DiscardOldestPolicy:將等待佇列隊首的任務丟棄,並執行當前任務
    CallerRunsPolicy:由呼叫執行緒處理該任務

執行緒池大小

如果執行緒池執行緒數量太小,當有大量請求需要處理,系統響應比較慢影響體驗,甚至會出現任務佇列大量堆積任務導致OOM。

如果執行緒池執行緒數量過大,大量執行緒可能會同時在爭取 CPU 資源,這樣會導致大量的上下文切換(cpu給執行緒分配時間片,當執行緒的cpu時間片用完後儲存狀態,以便下次繼續執行),從而增加執行緒的執行時間,影響了整體執行效率。

CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將執行緒數設定為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個執行緒是為了防止某些原因導致的任務暫停(執行緒阻塞,如io操作,等待鎖,執行緒sleep)而帶來的影響。一旦某個執行緒被阻塞,釋放了cpu資源,而在這種情況下多出來的一個執行緒就可以充分利用 CPU 的空閒時間。

I/O 密集型任務(2N): 系統會用大部分的時間來處理 I/O 操作,而執行緒等待 I/O 操作會被阻塞,釋放 cpu資源,這時就可以將 CPU 交出給其它執行緒使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些執行緒,具體的計算方法:最佳執行緒數 = CPU核心數 * (1/CPU利用率) = CPU核心數 * (1 + (I/O耗時/CPU耗時)),一般可設定為2N。

關閉執行緒池

shutdown():

將執行緒池狀態置為SHUTDOWN,並不會立即停止:

  • 停止接收外部提交的任務
  • 內部正在跑的任務和佇列裡等待的任務,會執行完
  • 等到第二步完成後,才真正停止

shutdownNow():

將執行緒池狀態置為STOP。企圖立即停止,事實上不一定:

  • 跟shutdown()一樣,先停止接收外部提交的任務
  • 忽略佇列裡等待的任務
  • 嘗試將正在跑的任務中斷(不一定中斷成功,取決於任務響應中斷的邏輯)
  • 返回未執行的任務列表

executor框架

1.5後引入的Executor框架的最大優點是把任務的提交和執行解耦。當提交一個Callable物件給ExecutorService,將得到一個Future物件,呼叫Future物件的get方法等待執行結果。Executor框架的內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化併發程式設計的操作。

簡介

executor框架由3部分組成:任務、任務的執行、非同步計算的結果

  • 任務。需要實現的介面:Runnable和Callable介面。
  • 任務的執行。ExecutorService 是一個介面,用於定義執行緒池,呼叫它的 execute(Runnable)或者 submit(Runnable/Callable)執行任務。ExecutorService介面繼承於Executor,有兩個實現類ThreadPoolExecutorScheduledThreadPoolExecutor
  • 非同步計算的結果。包括future介面和實現future介面的FutureTask,呼叫future.get()會阻塞當前執行緒直到任務完成,future.cancel()可以取消執行任務。

ThreadPoolExecutor例項

使用 ThreadPoolExecutor 建構函式自定義引數的方式來建立執行緒池。

public class ThreadPoolExecutorDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
); for (int i = 0; i < 10; i++) {
Callable worker = () -> {
System.out.println(Thread.currentThread().getName());
return "ok";
};
Future<String> f = executor.submit(worker);
f.get();
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

Runnable和Callable的區別

Runnable 任務執行後不能返回值或者丟擲異常。Callable 任務執行後可以返回值或丟擲異常。

Executors.callable(Runnable task);//runnable轉化為callable
ExecutorService.execute(Runnable);
ExecutorService.submit(Runnable/Callable);//submit callable任務有返回值 //返回值是泛型引數V
public interface Callable<V> {
V call() throws Exception;
}

Future和FutureTask

Future 可以獲取任務執行的結果、取消任務。呼叫 future.get()會阻塞當前執行緒直到任務返回結果。

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask 實現了 RunnableFuture 介面,而 RunnableFuture 實現了 Runnable 和 Future<V> 介面。

execute()和submit()

execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功與否。

submit()方法用於提交需要返回值的任務。執行緒池會返回一個 Future 型別的物件,通過這個 Future 物件可以判斷任務是否執行成功,並且可以通過 Futureget()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用 get(long timeout, TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,無論任務是否執行完。

常用的執行緒池

常見的執行緒池有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool 和 ScheduledThreadPool。這幾個都是 ExecutorService (執行緒池)例項。

FixedThreadPool

固定執行緒數的執行緒池。任何時間點,最多隻有 nThreads 個執行緒處於活動狀態執行任務。

public static ExecutorService newFixedThreadPool(int nThreads) {	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}

使用無界佇列 LinkedBlockingQueue(佇列容量為 Integer.MAX_VALUE),執行中的執行緒池不會拒絕任務,即不會呼叫RejectedExecutionHandler.rejectedExecution()方法。

maxThreadPoolSize 是無效引數,故將它的值設定為與 coreThreadPoolSize 一致。

keepAliveTime 也是無效引數,設定為0L,因為此執行緒池裡所有執行緒都是核心執行緒,核心執行緒不會被回收(除非設定了executor.allowCoreThreadTimeOut(true))。

不推薦使用:FixedThreadPool 不會拒絕任務,在任務比較多的時候會導致 OOM。

SingleThreadExecutor

只有一個執行緒的執行緒池。

public static ExecutionService newSingleThreadExecutor() {	return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}

使用無界佇列 LinkedBlockingQueue。執行緒池只有一個執行的執行緒,新來的任務放入工作佇列,執行緒處理完任務就迴圈從佇列裡獲取任務執行。保證順序的執行各個任務。

不推薦使用:同 FixedThreadPool,在任務比較多的時候會導致 OOM。

CachedThreadPool

根據需要建立新執行緒的執行緒池。

public static ExecutorService newCachedThreadPool() {	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}

如果主執行緒提交任務的速度高於執行緒處理任務的速度時,CachedThreadPool 會不斷建立新的執行緒。極端情況下,這樣會導致耗盡 cpu 和記憶體資源。

使用沒有容量的SynchronousQueue作為執行緒池工作佇列,當執行緒池有空閒執行緒時,SynchronousQueue.offer(Runnable task)提交的任務會被空閒執行緒處理,否則會建立新的執行緒處理任務。

不推薦使用:CachedThreadPool允許建立的執行緒數量為 Integer.MAX_VALUE ,可能會建立大量執行緒,從而導致 OOM。

ScheduledThreadPoolExecutor

在給定的延遲後執行任務,或者定期執行任務。在實際專案中基本不會被用到,因為有其他方案選擇比如quartz

使用的任務佇列 DelayQueue 封裝了一個 PriorityQueuePriorityQueue 會對佇列中的任務進行排序,時間早的任務先被執行(即ScheduledFutureTasktime 變數小的先執行),如果time相同則先提交的任務會被先執行(ScheduledFutureTasksquenceNumber 變數小的先執行)。

執行週期任務步驟:

  1. 執行緒從 DelayQueue 中獲取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任務是指 ScheduledFutureTask的 time 大於等於當前系統的時間;
  2. 執行這個 ScheduledFutureTask
  3. 修改 ScheduledFutureTask 的 time 變數為下次將要被執行的時間;
  4. 把這個修改 time 之後的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

編碼規範

阿里巴巴編碼規約不允許使用Executors去建立執行緒池,而是通過ThreadPoolExecutor的方式手動建立執行緒池,這樣子使用者會更加明確執行緒池的執行機制,避免資源耗盡的風險。

Executors 建立執行緒池物件的弊端:

FixedThreadPool和SingleThreadPool。允許請求佇列長度為 Integer.MAX_VALUE,可能堆積大量請求,從而導致OOM。

CachedThreadPool。建立的執行緒池允許的最大執行緒數是Integer.MAX_VALUE,當新增任務的速度大於執行緒池處理任務的速度,可能會建立大量的執行緒,消耗資源,甚至導致OOM。

正確示例(阿里巴巴編碼規範):

//正例1ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();//Common Thread PoolExecutorService pool = new ThreadPoolExecutor(5, 200,0L, TimeUnit.MILLISECONDS, //0L keepAliveTimenew LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());pool.execute(()-> System.out.println(Thread.currentThread().getName()));pool.shutdown();//gracefully shutdown//正例2ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, //corePoolSize threadFactory    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

JMM

Java記憶體模型:執行緒之間的共享變數儲存在主記憶體裡,每個執行緒都有自己私有的本地記憶體,本地記憶體儲存了共享變數的副本,執行緒對變數的操作都在本地記憶體中進行,不能直接讀寫主記憶體中的變數。

本地記憶體是JMM的一個抽象概念,並不真實存在,它包括快取、寫緩衝區、暫存器以及其他硬體和編譯器優化。

程序執行緒

程序是指一個記憶體中執行的應用程式,每個程序都有自己獨立的一塊記憶體空間,一個程序中可以啟動多個執行緒。

執行緒是比程序更小的執行單位,它是在一個程序中獨立的控制流,一個程序可以啟動多個執行緒,每條執行緒並行執行不同的任務。

執行緒狀態

初始(NEW):執行緒被構建,還沒有呼叫 start()。

執行(RUNNABLE):包括作業系統的就緒和執行兩種狀態。

阻塞(BLOCKED):一般是被動的,在搶佔資源中得不到資源,被動的掛起在記憶體,等待資源釋放將其喚醒。執行緒被阻塞會釋放CPU,不釋放記憶體。

等待(WAITING):進入該狀態的執行緒需要等待其他執行緒做出一些特定動作(通知或中斷)。

超時等待(TIMED_WAITING):該狀態不同於WAITING,它可以在指定的時間後自行返回。

終止(TERMINATED):表示該執行緒已經執行完畢。

圖片來源:Java併發程式設計的藝術

中斷

執行緒中斷即執行緒執行過程中被其他執行緒給打斷了,它與 stop 最大的區別是:stop 是由系統強制終止執行緒,而執行緒中斷則是給目標執行緒傳送一箇中斷訊號,如果目標執行緒沒有接收執行緒中斷的訊號並結束執行緒,執行緒則不會終止,具體是否退出或者執行其他邏輯取決於目標執行緒。

執行緒中斷三個重要的方法:

1、java.lang.Thread#interrupt

呼叫目標執行緒的interrupt()方法,給目標執行緒發一箇中斷訊號,執行緒被打上中斷標記。

2、java.lang.Thread#isInterrupted()

判斷目標執行緒是否被中斷,不會清除中斷標記。

3、java.lang.Thread#interrupted

判斷目標執行緒是否被中斷,會清除中斷標記。

private static void test2() {    Thread thread = new Thread(() -> {        while (true) {            Thread.yield();            // 響應中斷            if (Thread.currentThread().isInterrupted()) {                System.out.println("Java技術棧執行緒被中斷,程式退出。");                return;            }        }    });    thread.start();    thread.interrupt();}

常見方法

join

Thread.join(),在main中建立了thread執行緒,在main中呼叫了thread.join()/thread.join(long millis),main執行緒放棄cpu控制權,執行緒進入WAITING/TIMED_WAITING狀態,等到thread執行緒執行完才繼續執行main執行緒。

public final void join() throws InterruptedException {    join(0);}

yield

Thread.yield(),一定是當前執行緒呼叫此方法,當前執行緒放棄獲取的CPU時間片,但不釋放鎖資源,由執行狀態變為就緒狀態,讓OS再次選擇執行緒。作用:讓相同優先順序的執行緒輪流執行,但並不保證一定會輪流執行。實際中無法保證yield()達到讓步目的,因為讓步的執行緒還有可能被執行緒排程程式再次選中。Thread.yield()不會導致阻塞。該方法與sleep()類似,只是不能由使用者指定暫停多長時間。

public static native void yield(); //static方法

sleep

Thread.sleep(long millis),一定是當前執行緒呼叫此方法,當前執行緒進入TIMED_WAITING狀態,讓出cpu資源,但不釋放物件鎖,指定時間到後又恢復執行。作用:給其它執行緒執行機會的最佳方式。

public static native void sleep(long millis) throws InterruptedException;//static方法

wait()和sleep()的區別

相同點:

  1. 使當前執行緒暫停執行,把機會交給其他執行緒
  2. 任何執行緒在等待期間被中斷都會丟擲InterruptedException

不同點:

  1. wait() 是Object超類中的方法;而sleep()是執行緒Thread類中的方法
  2. 對鎖的持有不同,wait()會釋放鎖,而sleep()並不釋放鎖
  3. 喚醒方法不完全相同,wait() 依靠notify或者notifyAll 、中斷、達到指定時間來喚醒;而sleep()到達指定時間被喚醒
  4. 呼叫obj.wait()需要先獲取物件的鎖,而 Thread.sleep()不用

建立執行緒的方法

  • 通過擴充套件Thread類來建立多執行緒
  • 通過實現Runnable介面來建立多執行緒,可實現執行緒間的資源共享
  • 實現Callable介面,通過FutureTask介面建立執行緒。
  • 使用Executor框架來建立執行緒池。

繼承 Thread 建立執行緒程式碼如下。run()方法是由jvm建立完作業系統級執行緒後回撥的方法,不可以手動呼叫,手動呼叫相當於呼叫普通方法。

/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:15 */public class MyThread extends Thread {    public MyThread() {    }    @Override    public void run() {        for (int i = 0; i < 10; i++) {            System.out.println(Thread.currentThread() + ":" + i);        }    }    public static void main(String[] args) {        MyThread mThread1 = new MyThread();        MyThread mThread2 = new MyThread();        MyThread myThread3 = new MyThread();        mThread1.start();        mThread2.start();        myThread3.start();    }}

Runnable 建立執行緒程式碼

/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:04 */public class RunnableTest {    public static  void main(String[] args){        Runnable1 r = new Runnable1();        Thread thread = new Thread(r);        thread.start();        System.out.println("主執行緒:["+Thread.currentThread().getName()+"]");    }}class Runnable1 implements Runnable{    @Override    public void run() {        System.out.println("當前執行緒:"+Thread.currentThread().getName());    }}

實現Runnable介面比繼承Thread類所具有的優勢:

  1. 資源共享,適合多個相同的程式程式碼的執行緒去處理同一個資源
  2. 可以避免java中的單繼承的限制
  3. 執行緒池只能放入實現Runable或Callable類執行緒,不能直接放入繼承Thread的類

Callable 建立執行緒程式碼

/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:21 */public class CallableTest {    public static void main(String[] args) {        Callable1 c = new Callable1();        //非同步計算的結果        FutureTask<Integer> result = new FutureTask<>(c);        new Thread(result).start();        try {            //等待任務完成,返回結果            int sum = result.get();            System.out.println(sum);        } catch (InterruptedException | ExecutionException e) {            e.printStackTrace();        }    }}class Callable1 implements Callable<Integer> {    @Override    public Integer call() throws Exception {        int sum = 0;        for (int i = 0; i <= 100; i++) {            sum += i;        }        return sum;    }}

使用 Executor 建立執行緒程式碼

/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:44 */public class ExecutorsTest {    public static void main(String[] args) {        //獲取ExecutorService例項,生產禁用,需要手動建立執行緒池        ExecutorService executorService = Executors.newCachedThreadPool();        //提交任務        executorService.submit(new RunnableDemo());    }}class RunnableDemo implements Runnable {    @Override    public void run() {        System.out.println("大彬");    }}

執行緒間通訊

volatile

volatile是輕量級的同步機制,volatile保證變數對所有執行緒的可見性,不保證原子性。

  1. 當對volatile變數進行寫操作的時候,JVM會向處理器傳送一條LOCK字首的指令,將該變數所在快取行的資料寫回系統記憶體。
  2. 由於快取一致性協議,每個處理器通過嗅探在總線上傳播的資料來檢查自己的快取是不是過期了,當處理器發現自己快取行對應的記憶體地址被修改,就會將當前處理器的快取行置為無效狀態,當處理器對這個資料進行修改操作的時候,會重新從系統記憶體中把資料讀到處理器快取中。

MESI(快取一致性協議):當CPU寫資料時,如果發現操作的變數是共享變數,即在其他CPU中也存在該變數的副本,會發出訊號通知其他CPU將該變數的快取行置為無效狀態,因此當其他CPU需要讀取這個變數時,就會從記憶體重新讀取。

volatile關鍵字的兩個作用:

  1. 保證了不同執行緒對共享變數進行操作時的可見性,即一個執行緒修改了某個變數的值,這新值對其他執行緒來說是立即可見的。
  2. 禁止進行指令重排序。

指令重排序是JVM為了優化指令,提高程式執行效率,在不影響單執行緒程式執行結果的前提下,儘可能地提高並行度。Java編譯器會在生成指令系列時在適當的位置會插入記憶體屏障指令來禁止處理器重排序。插入一個記憶體屏障,相當於告訴CPU和編譯器先於這個命令的必須先執行,後於這個命令的必須後執行。對一個volatile欄位進行寫操作,Java記憶體模型將在寫操作後插入一個寫屏障指令,這個指令會把之前的寫入值都重新整理到記憶體。

synchronized

保證執行緒對變數訪問的可見性和排他性。synchronized 詳細內容見下文鎖部分。

等待通知機制

wait/notify為 Object 物件的方法,呼叫wait/notify需要先獲得物件的鎖。物件呼叫wait之後執行緒釋放鎖,將執行緒放到物件的等待佇列,當通知執行緒呼叫此物件的notify()方法後,等待執行緒並不會立即從wait返回,需要等待通知執行緒釋放鎖(通知執行緒執行完同步程式碼塊),等待佇列裡的執行緒獲取鎖,獲取鎖成功才能從wait()方法返回,即從wait方法返回前提是執行緒獲得鎖。

等待通知機制依託於同步機制,目的是確保等待執行緒從wait方法返回時能感知到通知執行緒對物件的變數值的修改。

synchronized

較常用的用於保證執行緒安全的方式。當一個執行緒獲取到鎖時,其他執行緒都會被阻塞住,當持有鎖的執行緒釋放鎖之後會喚醒這些執行緒,被喚醒的執行緒才有機會獲取到鎖。

  • 修飾例項方法,作用於當前物件例項加鎖,進入同步程式碼前要獲得當前物件例項的鎖
  • 修飾靜態方法,作用於當前類物件加鎖,進入同步程式碼前要獲得當前類物件的鎖(類的位元組碼檔案)
  • 修飾程式碼塊,指定加鎖物件,對給定物件加鎖,進入同步程式碼塊前要獲得給定物件的鎖

獲取了類鎖的執行緒和獲取了物件鎖的執行緒是不衝突的。

釋放鎖

當方法或者程式碼塊執行完畢後會自動釋放鎖,不需要做任何的操作。

當一個執行緒執行的程式碼出現異常時,其所持有的鎖會自動釋放。

實現原理

synchronized通過物件內部的監視器鎖(monitor)實現。每個物件都有一個monitor,當物件的monitor被持有時,則它處於鎖定的狀態。

程式碼塊的同步是使用monitorenter和monitorexit指令實現的,monitorenter指令是在編譯後插入到同步程式碼塊的開始位置,而monitorexit是插入到方法結束處或異常處。

public class SynchronizedDemo {    public void method() {        synchronized (this) {            System.out.println("method start");        }    }}

執行緒訪問同步塊時,先執行monitorenter指令時嘗試獲取monitor,過程如下:

  1. 如果monitor的進入數entry count為0,則該執行緒進入monitor,然後將進入數設定為1,該執行緒即為monitor的所有者。
  2. 如果執行緒已經佔有該monitor,只是重新進入,則進入monitor的進入數加1。
  3. 如果其他執行緒已經佔用了monitor,則該執行緒進入阻塞狀態,直到monitor的進入數為0,再重新嘗試獲取monitor。

執行緒退出同步塊時會執行monitorexit指令,monitor的進入數減1,如果減1後進入數為0,那執行緒退出monitor,不再是這個monitor的所有者。其他被這個monitor阻塞的執行緒可以嘗試去獲取這個 monitor。

Synchronized底層是通過一個monitor的物件來完成,其實wait/notify等方法也依賴於monitor物件,這就是為什麼只有在同步的塊或者方法中才能呼叫wait/notify等方法,否則會丟擲java.lang.IllegalMonitorStateException的異常的原因。

方法的同步不是通過新增monitorenter和monitorexit指令來完成,而是在其常量池中添加了ACC_SYNCHRONIZED識別符號。JVM就是根據該識別符號來實現方法的同步的:當執行緒呼叫方法時,會先檢查方法的 ACC_SYNCHRONIZED 訪問標誌是否被設定,如果設定了,說明此方法是同步方法,執行執行緒將先獲取monitor,獲取成功之後才能執行方法體,方法執行完後再釋放monitor。在方法執行期間,其他執行緒無法再獲得同一個monitor物件。

public class SynchronizedMethod {    public synchronized void method() {        System.out.println("Hello World!");    }}

鎖的狀態

Synchronized是通過物件內部的監視器來實現的。但是監視器鎖本質又是依賴於底層的作業系統的Mutex Lock來實現的。而作業系統實現執行緒之間的切換這就需要從使用者態轉換到核心態,這個成本非常高,狀態之間的轉換需要相對比較長的時間。這種依賴於作業系統Mutex Lock所實現的鎖我們稱之為重量級鎖。

JDK1.6中為了減少獲得鎖和釋放鎖帶來的效能消耗,引入了偏向鎖、輕量級鎖、自旋鎖、適應性自旋鎖、鎖消除、鎖粗化等技術來減少鎖操作的開銷。

synchronized鎖主要存在四種狀態,依次是:偏向鎖狀態、輕量級鎖狀態、重量級鎖狀態,他們會隨著競爭的激烈而逐漸升級。鎖可以升級不可降級,這種策略是為了提高獲得鎖和釋放鎖的效率。

  • 偏向鎖:當執行緒訪問同步塊並獲取鎖時,會在物件頭和鎖記錄中儲存鎖偏向的執行緒id,以後該執行緒進入和退出同步塊時,只需簡單測試一下物件頭的mark word中是否儲存著指向當前執行緒的偏向鎖,如果測試成功,則執行緒獲取鎖成功,否則,需再測試一下mark word中偏向鎖標識是否是1,是的話則使用CAS操作競爭鎖。如果競爭成功,則將Mark Word中執行緒ID設定為當前執行緒ID,如果CAS獲取偏向鎖失敗,則表示有競爭。當到達全域性安全點時獲得偏向鎖的執行緒被掛起,偏向鎖升級為輕量級鎖,然後被阻塞在安全點的執行緒繼續往下執行同步程式碼。

    偏向鎖偏向於第一個獲得它的執行緒,如果程式執行過程,該鎖沒有被其他執行緒獲取,那麼持有偏向鎖的執行緒就不需要進行同步。引入偏向鎖是為了在無多執行緒競爭的情況下儘量減少不必要的輕量級鎖執行的開銷,因為輕量級鎖的獲取及釋放使用了多次CAS原子指令,而偏向鎖只在置換ThreadID的時候使用一次CAS原子指令。當存在鎖競爭的時候,偏向鎖會升級為輕量級鎖。

    適用場景:在鎖無競爭的情況下使用,線上程沒有執行完同步程式碼之前,沒有其它執行緒去競爭鎖,一旦有了競爭就升級為輕量級鎖,升級為輕量級鎖的時候需要撤銷偏向鎖,會做很多額外操作,導致效能下降。

  • 輕量級鎖

    加鎖過程:執行緒執行同步塊之前,JVM會先在當前執行緒的棧幀中建立用於儲存鎖記錄的空間,並將物件頭的mark word複製到鎖記錄(displaced mark word)中,然後執行緒嘗試使用cas將物件頭的mark word替換為指向鎖記錄的指標。如果成功,則當前執行緒獲得鎖,否則表示有其他執行緒競爭鎖,當前執行緒便嘗試使用自旋來獲得鎖。當自旋超過一定的次數,或者一個執行緒在持有鎖,一個在自旋,又有第三個來訪時,輕量級鎖升級為重量級鎖。

    解鎖過程:使用原子的cas操作將displaced mark word替換回到物件頭,如果成功則解鎖成功,否則表明有鎖競爭,鎖會膨脹成重量級鎖。

    在沒有多執行緒競爭的前提下,使用輕量級鎖可以減少傳統的重量級鎖使用作業系統互斥量(申請互斥鎖)產生的效能消耗,因為使用輕量級鎖時,不需要申請互斥量。另外,輕量級鎖的加鎖和解鎖都用到了CAS操作。如果沒有競爭,輕量級鎖使用 CAS 操作避免了使用互斥操作的開銷。但如果存在鎖競爭,除了互斥量開銷外,還會額外發生CAS操作,因此在有鎖競爭的情況下,輕量級鎖比傳統的重量級鎖更慢!如果鎖競爭激烈,那麼輕量級鎖將很快膨脹為重量級鎖!

  • 重量級鎖:當一個執行緒獲取到鎖時,其他執行緒都會被阻塞住,當持有鎖的執行緒釋放鎖之後會喚醒這些執行緒,被喚醒的執行緒才有機會獲取到鎖。

    synchronized和Lock能保證同一時刻只有一個執行緒獲取鎖然後執行同步程式碼,並且在釋放鎖之前會將對變數的修改重新整理到主存當中,保證了可見性。

  • 自旋鎖:一般執行緒持有鎖的時間都不是太長,所以僅僅為了這一點時間去掛起執行緒/恢復執行緒比較浪費資源。自旋鎖就是讓該執行緒等待一段時間,執行一段無意義的迴圈,不會被立即掛起,看持有鎖的執行緒是否會很快釋放鎖。如果持有鎖的執行緒很快就釋放了鎖,那麼自旋的效率就非常好,反之,自旋的執行緒就會白白消耗掉處理的資源,這樣反而會帶來效能上的浪費。所以自旋的次數必須要有一個限度,如果自旋超過了限定次數仍然沒有獲取到鎖,則應該被掛起。

  • 自適應自旋鎖:JDK 1.6引入了更加聰明的自旋鎖,即自適應自旋鎖。所謂自適應就意味著自旋的次數不再是固定的,它是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。

  • 鎖消除:虛擬機器即使編譯器在執行時,如果檢測到那些共享資料不可能存在競爭,那麼就執行鎖消除。

  • 鎖粗化:如果一系列的連續操作都對同一個物件反覆加鎖和解鎖,那麼會帶來很多不必要的效能消耗,使用鎖粗化減少鎖操作的開銷。

ReentrantLock

重入鎖,支援一個執行緒對資源的重複加鎖。該鎖的還支援設定獲取鎖時的公平和非公平性。

使用lock時需要在try finally塊進行解鎖:

public static final Object get(String key) {    r.lock();    try {        return map.get(key);    } finally {        r.unlock();    }}

原理

ReentrantLock是通過組合自定義同步器來實現鎖的獲取與釋放。當執行緒嘗試獲取同步狀態時,首先判斷當前執行緒是否為獲取鎖的執行緒來決定獲取操作是否成功,如果是獲取鎖的執行緒再次請求,則將同步狀態值進行增加並返回true,表示獲取同步狀態成功。獲取同步狀態失敗,則該執行緒會被構造成node節點放到AQS同步佇列中。

如果鎖被獲取了n次,那麼前n-1次 tryRelease(int releases)方法必須返回false,第n次呼叫tryRelease()之後,同步狀態完全釋放(值為0),才會返回true。

ReentrantLock和synchronized區別

  1. 使用synchronized關鍵字實現同步,執行緒執行完同步程式碼塊會自動釋放鎖,而ReentrantLock需要手動釋放鎖。
  2. synchronized是非公平鎖,ReentrantLock可以設定為公平鎖。
  3. ReentrantLock上等待獲取鎖的執行緒是可中斷的,執行緒可以放棄等待鎖。而synchonized會無限期等待下去。
  4. ReentrantLock 可以設定超時獲取鎖。在指定的截止時間之前獲取鎖,如果截止時間到了還沒有獲取到鎖,則返回。
  5. ReentrantLock 的 tryLock() 方法可以嘗試非阻塞的獲取鎖,呼叫該方法後立刻返回,如果能夠獲取則返回true,否則返回false。

鎖的分類

公平鎖與非公平鎖

按照執行緒訪問順序獲取物件鎖。synchronized 是非公平鎖, Lock 預設是非公平鎖,可以設定為公平鎖,公平鎖會影響效能。

public ReentrantLock() {    sync = new NonfairSync();}public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}

共享式與獨佔式鎖

共享式與獨佔式的最主要區別在於:同一時刻獨佔式只能有一個執行緒獲取同步狀態,而共享式在同一時刻可以有多個執行緒獲取同步狀態。例如讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。

悲觀鎖與樂觀鎖

悲觀鎖,每次訪問資源都會加鎖,執行完同步程式碼釋放鎖,synchronized 和 ReentrantLock 屬於悲觀鎖。

樂觀鎖,不會鎖定資源,所有的執行緒都能訪問並修改同一個資源,如果沒有衝突就修改成功並退出,否則就會繼續迴圈嘗試。樂觀鎖最常見的實現就是CAS。

樂觀鎖一般來說有以下2種方式:

  1. 使用資料版本記錄機制實現,這是樂觀鎖最常用的一種實現方式。給資料增加一個版本標識,一般是通過為資料庫表增加一個數字型別的version欄位來實現。當讀取資料時,將version欄位的值一同讀出,資料每更新一次,對此version值加一。當我們提交更新的時候,判斷資料庫表對應記錄的當前版本資訊與第一次取出來的version值進行比對,如果資料庫表當前版本號與第一次取出來的version值相等,則予以更新,否則認為是過期資料。
  2. 使用時間戳。資料庫表增加一個欄位,欄位型別使用時間戳(timestamp),和上面的version類似,也是在更新提交的時候檢查當前資料庫中資料的時間戳和自己更新前取到的時間戳進行對比,如果一致則OK,否則就是版本衝突。

適用場景:

  • 悲觀鎖適合寫操作多的場景。
  • 樂觀鎖適合讀操作多的場景,不加鎖可以提升讀操作的效能。
CAS

CAS全稱 Compare And Swap,比較與交換,是樂觀鎖的主要實現方式。CAS 在不使用鎖的情況下實現多執行緒之間的變數同步。ReentrantLock 內部的 AQS 和原子類內部都使用了 CAS。

CAS演算法涉及到三個運算元:

  • 需要讀寫的記憶體值 V。
  • 進行比較的值 A。
  • 要寫入的新值 B。

只有當 V 的值等於 A 時,才會使用原子方式用新值B來更新V的值,否則會繼續重試直到成功更新值。

以 AtomicInteger 為例,AtomicInteger 的 getAndIncrement()方法底層就是CAS實現,關鍵程式碼是 compareAndSwapInt(obj, offset, expect, update),其含義就是,如果obj內的valueexpect相等,就證明沒有其他執行緒改變過這個變數,那麼就更新它為update,如果不相等,那就會繼續重試直到成功更新值。

CAS 三大問題:

  1. ABA問題。CAS需要在操作值的時候檢查記憶體值是否發生變化,沒有發生變化才會更新記憶體值。但是如果記憶體值原來是A,後來變成了B,然後又變成了A,那麼CAS進行檢查時會發現值沒有發生變化,但是實際上是有變化的。ABA問題的解決思路就是在變數前面新增版本號,每次變數更新的時候都把版本號加一,這樣變化過程就從A-B-A變成了1A-2B-3A

    JDK從1.5開始提供了AtomicStampedReference類來解決ABA問題,原子更新帶有版本號的引用型別。

  2. 迴圈時間長開銷大。CAS操作如果長時間不成功,會導致其一直自旋,給CPU帶來非常大的開銷。

  3. 只能保證一個共享變數的原子操作。對一個共享變數執行操作時,CAS能夠保證原子操作,但是對多個共享變數操作時,CAS是無法保證操作的原子性的。

    Java從1.5開始JDK提供了AtomicReference類來保證引用物件之間的原子性,可以把多個變數放在一個物件裡來進行CAS操作。

併發工具

在JDK的併發包裡提供了幾個非常有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段。

CountDownLatch

CountDownLatch用於某個執行緒等待其他執行緒執行完任務再執行,與thread.join()功能類似。常見的應用場景是開啟多個執行緒同時執行某個任務,等到所有任務執行完再執行特定操作,如彙總統計結果。

public class CountDownLatchDemo {    static final int N = 4;    static CountDownLatch latch = new CountDownLatch(N);    public static void main(String[] args) throws InterruptedException {       for(int i = 0; i < N; i++) {            new Thread(new Thread1()).start();       }       latch.await(1000, TimeUnit.MILLISECONDS); //呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行;等待timeout時間後count值還沒變為0的話就會繼續執行       System.out.println("task finished");    }    static class Thread1 implements Runnable {        @Override        public void run() {            try {                System.out.println(Thread.currentThread().getName() + "starts working");                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                latch.countDown();            }        }    }}

執行結果:

Thread-0starts workingThread-1starts workingThread-2starts workingThread-3starts workingtask finished

CyclicBarrier

CyclicBarrier(同步屏障),用於一組執行緒互相等待到某個狀態,然後這組執行緒再同時執行。

public CyclicBarrier(int parties, Runnable barrierAction) {}public CyclicBarrier(int parties) {}

引數parties指讓多少個執行緒或者任務等待至某個狀態;引數barrierAction為當這些執行緒都達到某個狀態時會執行的內容。

public class CyclicBarrierTest {    // 請求的數量    private static final int threadCount = 10;    // 需要同步的執行緒數量    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);    public static void main(String[] args) throws InterruptedException {        // 建立執行緒池        ExecutorService threadPool = Executors.newFixedThreadPool(10);        for (int i = 0; i < threadCount; i++) {            final int threadNum = i;            Thread.sleep(1000);            threadPool.execute(() -> {                try {                    test(threadNum);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                } catch (BrokenBarrierException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            });        }        threadPool.shutdown();    }    public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {        System.out.println("threadnum:" + threadnum + "is ready");        try {            /**等待60秒,保證子執行緒完全執行結束*/            cyclicBarrier.await(60, TimeUnit.SECONDS);        } catch (Exception e) {            System.out.println("-----CyclicBarrierException------");        }        System.out.println("threadnum:" + threadnum + "is finish");    }}

執行結果如下,可以看出CyclicBarrier是可以重用的:

threadnum:0is readythreadnum:1is readythreadnum:2is readythreadnum:3is readythreadnum:4is readythreadnum:4is finishthreadnum:3is finishthreadnum:2is finishthreadnum:1is finishthreadnum:0is finishthreadnum:5is readythreadnum:6is ready...

當四個執行緒都到達barrier狀態後,會從四個執行緒中選擇一個執行緒去執行Runnable。

CyclicBarrier和CountDownLatch區別

CyclicBarrier 和 CountDownLatch 都能夠實現執行緒之間的等待。

CountDownLatch用於某個執行緒等待其他執行緒執行完任務再執行。CyclicBarrier用於一組執行緒互相等待到某個狀態,然後這組執行緒再同時執行。

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,可用於處理更為複雜的業務場景。

Semaphore

Semaphore類似於鎖,它用於控制同時訪問特定資源的執行緒數量,控制併發執行緒數。

public class SemaphoreDemo {    public static void main(String[] args) {        final int N = 7;        Semaphore s = new Semaphore(3);        for(int i = 0; i < N; i++) {            new Worker(s, i).start();        }    }    static class Worker extends Thread {        private Semaphore s;        private int num;        public Worker(Semaphore s, int num) {            this.s = s;            this.num = num;        }        @Override        public void run() {            try {                s.acquire();                System.out.println("worker" + num +  " using the machine");                Thread.sleep(1000);                System.out.println("worker" + num +  " finished the task");                s.release();            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}

執行結果如下,可以看出並非按照執行緒訪問順序獲取資源的鎖,即

worker0 using the machineworker1 using the machineworker2 using the machineworker2 finished the taskworker0 finished the taskworker3 using the machineworker4 using the machineworker1 finished the taskworker6 using the machineworker4 finished the taskworker3 finished the taskworker6 finished the taskworker5 using the machineworker5 finished the task

原子類

基本型別原子類

使用原子的方式更新基本型別

  • AtomicInteger:整型原子類
  • AtomicLong:長整型原子類
  • AtomicBoolean :布林型原子類

AtomicInteger 類常用的方法:

public final int get() //獲取當前的值public final int getAndSet(int newValue)//獲取當前的值,並設定新的值public final int getAndIncrement()//獲取當前的值,並自增public final int getAndDecrement() //獲取當前的值,並自減public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值boolean compareAndSet(int expect, int update) //如果輸入的數值等於預期值,則以原子方式將該值設定為輸入值(update)public final void lazySet(int newValue)//最終設定為newValue,使用 lazySet 設定之後可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。

AtomicInteger 類主要利用 CAS (compare and swap) 保證原子操作,從而避免加鎖的高開銷。

陣列型別原子類

使用原子的方式更新數組裡的某個元素

  • AtomicIntegerArray:整形陣列原子類
  • AtomicLongArray:長整形陣列原子類
  • AtomicReferenceArray :引用型別陣列原子類

AtomicIntegerArray 類常用方法:

public final int get(int i) //獲取 index=i 位置元素的值public final int getAndSet(int i, int newValue)//返回 index=i 位置的當前的值,並將其設定為新值:newValuepublic final int getAndIncrement(int i)//獲取 index=i 位置元素的值,並讓該位置的元素自增public final int getAndDecrement(int i) //獲取 index=i 位置元素的值,並讓該位置的元素自減public final int getAndAdd(int i, int delta) //獲取 index=i 位置元素的值,並加上預期的值boolean compareAndSet(int i, int expect, int update) //如果輸入的數值等於預期值,則以原子方式將 index=i 位置的元素值設定為輸入值(update)public final void lazySet(int i, int newValue)//最終 將index=i 位置的元素設定為newValue,使用 lazySet 設定之後可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。

引用型別原子類

  • AtomicReference:引用型別原子類
  • AtomicStampedReference:帶有版本號的引用型別原子類。該類將整數值與引用關聯起來,可用於解決原子的更新資料和資料的版本號,可以解決使用 CAS 進行原子更新時可能出現的 ABA 問題。
  • AtomicMarkableReference :原子更新帶有標記的引用型別。該類將 boolean 標記與引用關聯起來

AQS

AQS定義了一套多執行緒訪問共享資源的同步器框架,許多併發工具的實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch。

原理

AQS使用一個volatile的int型別的成員變數state來表示同步狀態,通過CAS修改同步狀態的值。

private volatile int state;//共享變數,使用volatile修飾保證執行緒可見性

同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態(獨佔或共享 )構造成為一個節點(Node)並將其加入同步佇列並進行自旋,當同步狀態釋放時,會把首節中的後繼節點對應的執行緒喚醒,使其再次嘗試獲取同步狀態。

Condition

任意一個Java物件,都擁有一組監視器方法(定義在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,使用這些方法的前提是已經獲取物件的鎖,和 synchronized 配合使用。Condition介面也提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式。Condition是依賴Lock物件。

Lock lock = new ReentrantLock();Condition condition = lock.newCondition();public void conditionWait() throws InterruptedException {    lock.lock();    try {            condition.await();    } finally {            lock.unlock();    }}public void conditionSignal() throws InterruptedException {    lock.lock();    try {            condition.signal();    } finally {            lock.unlock();    }}

一般將Condition物件作為成員變數。當呼叫await()方法後,當前執行緒會釋放鎖進入等待佇列。其他執行緒呼叫Condition物件的signal()方法,喚醒等待佇列首節點的執行緒。

實現原理

每個Condition物件都包含著一個等待佇列,如果一個執行緒成功獲取了鎖之後呼叫了Condition.await()方法,那麼該執行緒將會釋放同步狀態、喚醒同步佇列中的後繼節點,然後構造成節點加入等待佇列。只有當執行緒再次獲取Condition相關聯的鎖之後,才能從await()方法返回。

圖片來源:Java併發程式設計的藝術

在Object的監視器模型上,一個物件擁有一個同步佇列和等待佇列。Lock通過AQS實現,AQS可以有多個Condition,所以Lock擁有一個同步佇列和多個等待佇列。

圖片來源:Java併發程式設計的藝術

執行緒獲取了鎖之後,呼叫Condition的signal()方法,會將等待佇列的隊首節點移到同步佇列中,然後該節點的執行緒會嘗試去獲取同步狀態。成功獲取同步狀態之後,執行緒將await()方法返回。

圖片來源:Java併發程式設計的藝術

其他

Daemon Thread

在Java中有兩類執行緒:

  • User Thread(使用者執行緒)
  • Daemon Thread(守護執行緒)

只要當前JVM例項中尚存在任何一個非守護執行緒沒有結束,守護執行緒就全部工作;只有當最後一個非守護執行緒結束時,守護執行緒隨著JVM一同結束工作。

Daemon的作用是為其他執行緒的執行提供便利服務,守護執行緒最典型的應用就是垃圾收集。

將執行緒轉換為守護執行緒可以通過呼叫Thread物件的setDaemon(true)方法來實現。

參考資料

執行緒中斷

synchronized實現原理

指令重排導致單例模式失效

本文已經收錄到github倉庫,此倉庫用於分享Java相關知識總結,包括Java基礎、MySQL、Spring Boot、MyBatis、Redis、RabbitMQ、計算機網路、資料結構與演算法等等,歡迎大家提pr和star!

github地址:https://github.com/Tyson0314/Java-learning

如果github訪問不了,可以訪問gitee倉庫。

gitee地址:https://gitee.com/tysondai/Java-learning