1. 程式人生 > >併發與多執行緒

併發與多執行緒

併發與多執行緒

基本概念

併發與並行

  1. 併發:指兩個或多個事件在同一時間間隔內發生 。當有多個執行緒在操作時,如果系統只有一個CPU,則它根本不可能真正同時進行一個以上的執行緒,它只能把CPU執行時間劃分成若干個時間段,再將時間 段分配給各個執行緒執行,在一個時間段的執行緒程式碼執行時,其它執行緒處於掛起狀。這種方式稱之為併發(Concurrent)
  2. 並行:指兩個或者多個事件在同一時刻發生 。當系統有一個以上CPU時,則執行緒的操作有可能非併發。當一個CPU執行一個執行緒時,另一個CPU可以執行另一個執行緒,兩個執行緒互不搶佔CPU資源,可以同時進行,這種方式稱之為並行(Parallel)

程序與執行緒

  1. 一個程式可能有多個程序,一個程序由多個執行緒和共享資源組成
  2. 程序:擁有資源的基本單位
  3. 執行緒:獨立排程分派的基本單位

執行緒

建立執行緒

Thread

  1. 繼承 Thread 類(Thread 實現了 Runnable 介面)

  2. 重寫 run 方法

  3. start() 方法啟動執行緒

Runnable

  1. 實現 Runnable 介面
  2. 重寫 run 方法
  3. new Thread(Runnable target),new Thread(Runnable target,String name)

多個 Thread 例項共用一個 Runnable,這些執行緒的 run 方法相同,可以共享相同的資料

但是存線上程同步問題

public class RunnableTest implements Runnable
{
    private int ticket = 10;
    public void run()
    {
        while (true)
        {
            if (ticket > 0)
            {
                System.out.println(Thread.currentThread().getName() + "售出" + ticket + "號票");
                ticket--;
            }
            else System.exit(0);
        }
    }
    public static void main(String[] args)
    {
        RunnableTest rt = new RunnableTest();
        Thread t1 = new Thread(rt, "1號視窗");
        Thread t2 = new Thread(rt, "2號視窗");
        t1.start();
        t2.start();
    }
}

print

1號視窗售出10號票
1號視窗售出9號票
1號視窗售出8號票
1號視窗售出7號票
2號視窗售出7號票
2號視窗售出5號票
1號視窗售出6號票
2號視窗售出4號票
1號視窗售出3號票
2號視窗售出2號票
1號視窗售出1號票

匿名類

匿名類可以方便的訪問方法的區域性變數,但是必須宣告為 final,因為匿名類和普通區域性變數生命週期不一致

jdk7 中已不再需要顯示宣告為 final,實際上被虛擬機器自動隱式聲明瞭

public static void main(String[] args)
{
    new Thread( )
    {
        public void run( )
        {
            //內容
        }
    }.start( );
    new Thread(new Runnable( )
    {
        public void run( )
        {
            //內容
        }
	}).start( );
}

Callable

  1. 建立 Callable 的實現類,並衝寫 call() 方法,該方法為執行緒執行體,並且該方法有返回值

  2. 建立 Callable 實現類的例項,並用 FutuerTask 類來包裝 Callable 物件,該 FutuerTask 封裝了 Callable 物件 call() 方法的返回值

  3. 例項化 FutuerTask 類,引數為 FutuerTask 介面實現類的物件來啟動執行緒

  4. 通過 FutuerTask 類的物件的 get() 方法來獲取執行緒結束後的返回值

    public class CallableTest implements Callable<Integer>
    {
        //重寫執行體 call( )
        public Integer call( ) throws Exception
        {
            int i = 0;
            for (; i < 10; i++)
            {
               //
            }
            return i;
        }
        public static void main(String[] args)
        {
            Callable call = new CallableTest( );
            FutureTask<Integer> f = new FutureTask<Integer>(call);
            Thread t = new Thread(f);
            t.start( );
            //得到返回值
            try
            {
                System.out.println("返回值:" + f.get( ));
            }
            catch (Exception e)
            {
                e.printStackTrace( );
            }
        }
    }
    

    print

    返回值:10
    

執行緒方法

  1. 執行緒執行體:run()

  2. 啟動執行緒:start()

  3. Thread 類方法

    方法 描述
    public final void setName(String name) 改變執行緒名稱
    public final void setPriority(int priority) 設定優先順序
    public final void setDaemon(boolean on) 設為守護執行緒,當只剩下守護執行緒時自動結束
    public final boolean isAlive() 測試執行緒是否處於活動狀態
    public static void yield() 暫停當前執行緒(回到就緒狀態)
    public static void sleep(long millisec) 進入休眠狀態
    public final void join() 暫停當前執行緒,等待呼叫該方法執行緒執行完畢
    public final void join(long millisec) 暫停當前執行緒指定時間
    public static Thread currentThread() 返回對當前正在執行的執行緒物件的引用

執行緒狀態

  1. 就緒狀態:

    • start() 方法進入就緒狀態,等待虛擬機器排程
    • 執行狀態呼叫 yield 方法會進入就緒狀態
    • lock 池中的執行緒獲得鎖後進入就緒狀態
  2. 執行狀態:就緒狀態經過執行緒排程進去執行狀態

  3. 阻塞狀態:

    • 休眠:呼叫 sleep 方法
    • 物件 wait 池:呼叫 wait 或 join 方法,被 notify 後進入 lock 池
    • 物件 lock 池:未獲得鎖
  4. 死亡狀態:run 方法執行完畢

    graph TB T(新執行緒)--start方法-->A(就緒狀態) A--執行緒排程-->B(執行狀態) B--yield方法-->A B--sleep方法-->D(阻塞:休眠) B--wait或join方法-->E(阻塞:wait池) B--未獲得鎖-->F(阻塞:lock池) B--run方法執行完-->C(死亡狀態) D--時間到-->A E--notify方法-->F F--獲得鎖-->A

執行緒同步

保證程式原子性、可見性、有序性的過程

阻塞同步

基於加鎖爭用的悲觀併發策略

synchronized

  1. synchronized 含義

    • 使用 synchronized 可以鎖住某一物件, 當其他執行緒也想鎖住該物件以執行某段程式碼時,必須等待已經持有鎖的執行緒釋放鎖

    • 釋放鎖的方式有互斥程式碼執行完畢、丟擲異常、鎖物件呼叫 wait 方法

  2. 不同的使用方式代表不同的鎖粒度

    • 修飾普通方法 = synchronized(this)
    • 修飾靜態方法 = synchronized(X.class)
    • 修飾程式碼塊(物件 extends Object)

ReentrantLock

  1. 建立 Lock 鎖

    ReentrantLock 實現了 Lock 介面, Lock lock = new ReentrantLock()

  2. Lock 含義

    • 使用 lock() 方法表示當前執行緒佔有 lock 物件

    • 釋放該物件要顯示掉用 unlock() 方法 ,多在 finally 塊中進行釋放

  3. trylock 方法

    • synchronized 會一直等待鎖,而 Lock 提供了 trylock 方法,在指定時間內試圖佔用
    • 使用 trylock, 釋放鎖時要判斷,若佔用失敗,unlock 會丟擲異常
  4. Lock 的執行緒互動

    • 通過 lock 物件得到一個 Condition 物件,Condition condition = lock.newCondition()

    • 呼叫這個Condition物件的:await,signal,signalAll 方法

  5. 示例

    public class LockTest
    {
        public static void log(String msg)//日誌方法
        {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date( );
            String dateStr = sdf.format(date);
            System.out.println(dateStr + " " + Thread.currentThread( ).getName( ) + " " + msg);
        }
        public static void main(String[] args)
        {
            Lock lock = new ReentrantLock( );
            new Thread("t1")
            {
                public void run( )
                {
                    boolean flag = false;
                    try
                    {
                        log("執行緒已啟動");
                        log("嘗試佔有lock");
                        flag = lock.tryLock(1, TimeUnit.SECONDS);
                        if (flag)
                        {
                            log("成功佔有lock");
                            log("執行3秒業務操作");
                            Thread.sleep(3000);
                        }
                        else
                        {
                            log("經過1秒鐘嘗試,佔有lock失敗,放棄佔有");
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace( );
                    }
                    finally
                    {
                        if (flag)
                        {
                            log("釋放lock");
                            lock.unlock( );
                        }
                    }
                    log("執行緒結束");
                }
            }.start( );
            try
            {
                //先讓 t1 先執行兩秒
                Thread.sleep(2000);
            }
            catch (InterruptedException e1)
            {
                e1.printStackTrace( );
            }
            new Thread("t2")
            {
                public void run( )
                {
                    boolean flag = false;
                    try
                    {
                        log("執行緒啟動");
                        log("嘗試佔有lock");
    
                        flag = lock.tryLock(1, TimeUnit.SECONDS);
                        if (flag)
                        {
                            log("成功佔有lock");
                            log("執行3秒的業務操作");
                            Thread.sleep(3000);
                        }
                        else
                        {
                            log("經過1秒鐘的嘗試,佔有lock失敗,放棄佔有");
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace( );
                    }
                    finally
                    {
                        if (flag)
                        {
                            log("釋放lock");
                            lock.unlock( );
                        }
                    }
                    log("執行緒結束");
                }
            }.start( );
        }
    }
    

    print

    2019-11-07 15:50:01 t1 執行緒已啟動
    2019-11-07 15:50:01 t1 嘗試佔有lock
    2019-11-07 15:50:01 t1 成功佔有lock
    2019-11-07 15:50:01 t1 執行3秒業務操作
    2019-11-07 15:50:03 t2 執行緒啟動
    2019-11-07 15:50:03 t2 嘗試佔有lock
    2019-11-07 15:50:04 t2 經過1秒鐘的嘗試,佔有lock失敗,放棄佔有
    2019-11-07 15:50:04 t2 執行緒結束
    2019-11-07 15:50:04 t1 釋放lock
    2019-11-07 15:50:04 t1 執行緒結束
    
  6. synchronized 和 Lock 區別

    • synchronized 是關鍵字,Lock 是介面, synchronized是內建的語言實現,Lock是程式碼層面的實現
    • synchronized 執行完畢自動釋放鎖,Lock 需要顯示 unlock()
    • synchronized 會一直等待,嘗試佔用鎖,Lock 可以使用 trylock,在一段時間內嘗試佔用,時間到佔用失敗則放棄

非阻塞同步

非阻塞同步是一種基於衝突檢測和資料更新的樂觀併發策略

actomic 類

  1. 原子操作

    • 原子操作是不可中斷的操作,必須一次性執行完成
    • 賦值操作是原子操作,但 a++ 不是原子操作, 而是取值、加一、賦值三個步驟
    • 一個執行緒取 i 的值後,還沒來得及加一,第二個執行緒也來取值,就產生了執行緒安全問題
  2. actomic 類的使用

    • jdk6 以後,新增包 java.util.concurrent.atomic,裡面有各種原子類,比如 AtomicInteger
    • AtomicInteger 提供了各種自增,自減等方法,這些方法都是原子性的。換句話說,自增方法 incrementAndGet 是執行緒安全的
    • 10000 個執行緒做 value 加一的操作,用 a++ 方式得出不準確的結果,用原子類 AtomicInteger 的 addAndGet() 方法得出正確結果
    public class ThreadTest
    {
        static int value1 = 0;
        static AtomicInteger value2 = new AtomicInteger(0);//原子整型類
        public static void main(String[] args)
        {
            for (int i = 0; i < 100000; i++)
            {
                new Thread( )
                {
                    public void run( )
                    {
                        value1++;
                    }
                }.start( );
                new Thread( )
                {
                    public void run( )
                    {
                        value2.addAndGet(1);//value++的原子操作
                    }
                }.start( );
            }
            while (Thread.activeCount( ) > 2)
            {
                Thread.yield( );
            }
            System.out.println(value1);
            System.out.println(value2);
        }
    }
    

    print

    99996
    100000
    

無同步方案

如果一個方法不涉及共享資料,那麼他天生就是執行緒安全的

可重入程式碼

可以在程式碼執行的任何時刻中斷它,轉而去執行另外一段程式碼,在控制權返回之後,原來的程式不會出現任何的錯誤

  1. 一個方法返回結果是可以預測的,輸入了相同的資料,就能返回相同的結果,那這個方法就具有可重入性,也就是執行緒安全的

  2. 棧封閉是一種可重用程式碼

    多個執行緒訪問同一個方法的區域性變數時,不會出現執行緒安全問題,因為區域性變數儲存在虛擬機器棧中,屬於執行緒的私有區域,所以不會出現執行緒安全性

    public class ThreadTest
    {
        static void add( )
        {
            int value = 0;
            for (int i = 0; i < 1000; i++)
            {
                value++;
            }
            System.out.println(value);
        }
    
        public static void main(String[] args)
        {
            ExecutorService threadPool = Executors.newCachedThreadPool( );
            threadPool.execute(( ) -> add( ));
            threadPool.execute(( ) -> add( ));
            threadPool.shutdown( );
        }
    }
    

    print

    1000
    1000
    

執行緒本地儲存

  1. 把共享資料的可見範圍限制在同一個執行緒之內,即便無同步也能做到避免資料爭用

  2. 使用 java.lang.ThreadLocal 類來實現執行緒本地儲存功能

    • ThreadLocal 變數是一個不同執行緒可以擁有不同值的變數,所有的執行緒可以共享一個ThreadLocal物件
    • 任意一個執行緒的 ThreadLocal 值發生變化,不會影響其他的執行緒
    • 用set()和get()方法對ThreadLocal變數進行賦值和檢視其值
    public class ThreadLocalDemo
    {
        public static void main(String[] args)
        {
            ThreadLocal threadLocal1 = new ThreadLocal( );
            Thread t1 = new Thread(( ) ->
            {
                threadLocal1.set(1);
                try
                {
                    Thread.sleep(3000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace( );
                }
                System.out.println(threadLocal1.get( ));
            });
            Thread t2 = new Thread(( ) -> threadLocal1.set(2));
            t1.start( );
            t2.start( );
        }
    }
    

    print

    1
    
  3. ThreadLocal 原理

    • 每個執行緒都有一個 ThreadLocal.ThreadLocalMap 物件,呼叫 threadLocal1.set(T value) 方法時,將 threadLoacl1 和 value 鍵值對存入 map
    • ThreadLocalMap 底層資料結構可能導致記憶體洩露,儘可能在使用 ThreadLocal 後呼叫 remove()方法

死鎖

死鎖條件

  1. 互斥條件
  2. 請求與保持條件
  3. 不可剝奪條件
  4. 迴圈等待條件(環路條件)

Java死鎖示例

public static void main(String[] args)
{
    Object o1 = new Object( );
    Object o2 = new Object( );

    Thread t1 = new Thread( )
    {
        public void run( )
        {
            synchronized (o1)//佔有 o1
            {
                System.out.println("t1 已佔有 O1");
                try
                {
                    Thread.sleep(1000);//停頓1000毫秒,另一個執行緒有足夠的時間佔有 o1
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace( );
                }
                System.out.println("t1 試圖佔有 o2");
                System.out.println("t1 等待中");
                synchronized (o2)
                {
                    System.out.println("t1 已佔有 O2");
                }
            }
        }
    };
    Thread t2 = new Thread( )
    {
        public void run( )
        {
            synchronized (o2)  //佔有 o2
            {
                System.out.println("t2 已佔有 o2");
                try
                {
                    Thread.sleep(1000);//停頓1000毫秒,另一個執行緒有足夠的時間佔有 o2
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace( );
                }
                System.out.println("t2 試圖佔有 o1");
                System.out.println("t2 等待中");
                synchronized (o1)
                {
                    System.out.println("t2 已佔有 O1");
                }
            }
        }
    };
    t1.start( );
    t2.start( );
}

print

t1 已佔有 O1
t2 已佔有 o2
t1 試圖佔有 o2
t1 等待中
t2 試圖佔有 o1
t2 等待中

執行緒通訊

  1. Object 類方法

    方法 描述
    wait() 執行緒進入等待池
    notify() 喚醒等待當前執行緒鎖的執行緒
    notifyAll() 喚醒所有執行緒,優先順序高的優先喚醒

    為什麼這些方法設定在 Object 物件上?

    表面上看,因為任何物件都可以加鎖

    底層上說,java 多執行緒同步的 Object Monitor 機制,每個物件上都設定有類似於集合的資料結構,儲存當前獲得鎖的執行緒、等待獲得鎖的執行緒(lock set)、等待被喚醒的執行緒(wait set)

  2. 生產者消費者模型

    • sleep 方法,讓出 cpu,但不放下鎖
    • wait 方法,進入鎖物件的等待池,放下鎖
public class ProducerAndConsumer
{
    public static void main(String[] args)
    {
        Goods goods = new Goods();
        Thread producer = new Thread()//生產者執行緒
        {
            public void run()
            {
                while (true) goods.put();
            }
        };
        Thread consumer = new Thread()//消費者執行緒
        {
            public void run()
            {
                while (true) goods.take();
            }
        };
        consumer.start();
        producer.start();
    }
}
class Goods//商品類
{
    int num = 0;//商品數目
    int space = 10;//空位總數
    public synchronized void put()
    {
        if (num < space)//有空位可放,可以生產
        {
            num++;
            System.out.println("放入一個商品,現有" + num + "個商品," + (space - num) + "個空位");
            notify();//喚醒等待該鎖的執行緒
        }
        else//無空位可放,等待空位
        {
            try
            {
                System.out.println("沒有空位可放,等待拿出");
                wait();//進入該鎖物件的等待池
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
    public synchronized void take()
    {
        if (num > 0)//有商品可拿
        {
            num--;
            System.out.println("拿出一個商品,現有" + num + "個商品," + (space - num) + "個空位");
            notify();//喚醒等待該鎖的執行緒
        }
        else///等待生產產品
        {
            try
            {
                System.out.println("沒有商品可拿,等待放入");
                wait();//進入該鎖物件的等待池
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
}

print

沒有商品可拿,等待放入
放入一個商品,現有1個商品,9個空位
放入一個商品,現有2個商品,8個空位
拿出一個商品,現有1個商品,9個空位
放入一個商品,現有2個商品,8個空位
放入一個商品,現有3個商品,7個空位
放入一個商品,現有4個商品,6個空位
拿出一個商品,現有3個商品,7個空位
放入一個商品,現有4個商品,6個空位
···

執行緒池

執行緒的啟動和結束都是比較消耗時間和佔用資源的,如果在系統中用到了很多的執行緒,大量的啟動和結束動作會嚴重影響效能

執行緒池很像生產者消費者模式,消費的物件是一個一個的能夠執行的任務

  1. 設計思路

    • 準備任務容器,可用 List,存放任務
    • 執行緒池類構造方法中建立多個執行者執行緒
    • 任務容器為空時,所有執行緒 wait
    • 當外部執行緒向任務容器加入任務,就會有執行者執行緒被 notify
    • 執行任務完畢後,沒有接到新任務,就回歸等待狀態
  2. 實現一個執行緒池

    public class ThreadPool
    {
        int poolSize;// 執行緒池大小
        LinkedList<Runnable> tasks = new LinkedList<Runnable>();// 任務容器
        public ThreadPool(int poolSize)
        {
            this.poolSize = poolSize;
            synchronized (tasks)//啟動 poolSize 個任務執行者執行緒
            {
                for (int i = 0; i < poolSize; i++)
                {
                    new ExecuteThread("執行者執行緒 " + i).start();
                }
            }
        }
        public void add(Runnable r)//新增任務
        {
            synchronized (tasks)
            {
                tasks.add(r);
                System.out.println("加入新任務");
                tasks.notifyAll();// 喚醒等待的任務執行者執行緒
            }
        }
        class ExecuteThread extends Thread//等待執行任務的執行緒
        {
            Runnable task;
            public ExecuteThread(String name)
            {
                super(name);
            }
            public void run()
            {
                System.out.println("啟動:" + this.getName());
                while (true)
                {
                    synchronized (tasks)
                    {
                        while (tasks.isEmpty())
                        {
                            try
                            {
                                tasks.wait();
                            }
                            catch (InterruptedException e)
                            {
                                e.printStackTrace();
                            }
                        }
                        task = tasks.removeLast();
                        tasks.notifyAll(); // 允許新增任務的執行緒可以繼續新增任務
                    }
                    System.out.println(this.getName() + " 接到任務");
                    task.run();//執行任務
                }
            }
        }
        public static void main(String[] args)
        {
            ThreadPool pool = new ThreadPool(3);
            for (int i = 0; i < 5; i++)
            {
                Runnable task = new Runnable()//建立任務
                {
                    public void run()//任務內容
                    {
                        System.out.println(Thread.currentThread().getName()+" 執行任務");
                    }
                };
                pool.add(task);//加入任務
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
    

    print

    main 加入新任務
    啟動:執行者執行緒 0
    執行者執行緒 0 接到任務
    執行者執行緒 0 執行任務
    啟動:執行者執行緒 1
    啟動:執行者執行緒 2
    main 加入新任務
    執行者執行緒 2 接到任務
    執行者執行緒 2 執行任務
    main 加入新任務
    執行者執行緒 2 接到任務
    執行者執行緒 2 執行任務
    
  3. java 執行緒池類

    • 預設執行緒池類 ThreadPoolExecutor 在 java.util.concurrent 包下

      ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
      /*
      第一個引數 int 型別, 10 表示這個執行緒池初始化了 10 個執行緒在裡面工作
      第二個引數 int 型別, 15 表示如果 10 個執行緒不夠用了,就會自動增加到最多 15個 執行緒
      第三個引數 60 結合第四個引數 TimeUnit.SECONDS,表示經過 60 秒,多出來的執行緒還沒有接到任務,就會回收,最後保持池子裡就 10 個
      第五個引數 BlockingQueue 型別,new LinkedBlockingQueue() 用來放任務的集合
      */
      
    • execute() 方法新增新任務

      public class TestThread 
      {   
          public static void main(String[] args) throws InterruptedException 
          {
              ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
              threadPool.execute(new Runnable()
              {//新增任務
                  public void run() 
                  {
                      System.out.println("執行任務");
                  }    
              });
          }
      }
      
  4. java 中幾種執行緒池

    java 執行緒池的頂級介面是 Executor ,子介面是 ExecutorService ,子介面使用更廣泛

    Executors 類提供了一系列工廠方法用於建立執行緒池,返回的執行緒池實現了 ExecutorService 介面

    • newCachedThreadPool有緩衝的執行緒池,執行緒數 JVM 控制,有執行緒可使用時不會建立新執行緒
    • newFixedThreadPool,固定大小的執行緒池,任務量超過執行緒數時,任務存入等待佇列
    • newScheduledThreadPool,建立一個執行緒池,可安排在給定延遲後執行命令或者定期地執行
    • newSingleThreadExecutor,只有一個執行緒,順序執行多個任務,若意外終止,則會新建立一個
    ExecutorService threadPool = null;
    threadPool = Executors.newCachedThreadPool();//緩衝執行緒池
    threadPool = Executors.newFixedThreadPool(3);//固定大小的執行緒池
    threadPool = Executors.newScheduledThreadPool(2);//定時任務執行緒池
    threadPool = Executors.newSingleThreadExecutor();//單執行緒的執行緒池
    threadPool = new ThreadPoolExecutor(···);//預設執行緒池,多個可控引數
    

執行緒安全類

  1. StringBuffer:內部方法用 synchronized 修飾
  2. Vetort:繼承於 AbstractList
  3. Stack:繼承於 Vector
  4. HashTable:繼承於 Dictionary,實現了 Map 介面
  5. Property:繼承於 HashTable,實現了 Map 介面
  6. concurrentHashMap:分段加鎖機制