1. 程式人生 > >多執行緒高併發程式設計(1) -- 基礎及詳解

多執行緒高併發程式設計(1) -- 基礎及詳解

背景:

  程序和執行緒的區別:

  程序的記憶體大小為:堆記憶體+執行緒數量*棧記憶體,即執行緒數量 =( 最大地址空間[MaxProcessMemory] - JVM堆記憶體 - 系統保留記憶體[ReservedOsMemory] )/ ThreadStackSize(XSS),從中可以看出,執行緒的數量隨棧記憶體的增多而減少。

  執行緒是程式執行的一個路徑,每一個執行緒都有自己的區域性變量表、程式計數器(指向正在執行的指令指標)以及各自的生命週期。當啟動了一個Java虛擬機器(JVM)時,從作業系統開始就會建立一個新的程序(JVM程序),JVM程序將會派生或者建立很多執行緒。

  • 一個執行緒的建立肯定是由另一個執行緒完成的;
  • 被建立執行緒的父執行緒是建立它的執行緒;

  執行緒會帶來額外的開銷,如CPU排程時間、併發控制開銷等;每個執行緒在自己的工作記憶體互動,載入和儲存主記憶體控制不當會造成資料不一致。

一.執行緒建立方式:

  • 構造Thread類:實現執行緒的執行單元run有兩種方式,分別是下面

    • 繼承Thread,重寫run方法:Thread實現了Runnable介面,使用start開啟執行緒,start開啟後執行緒會加入排程器,然後呼叫run方法,start會呼叫start0本地方法跟OS進行互動執行;下面是start原始碼解析

      /**
       * Causes this thread to begin execution; the Java Virtual Machine
       * calls the <code>run</code> method of this thread.
       * 開啟執行緒,JVM會呼叫run方法【start使用了模板方法】
       * <p>
       * It is never legal to start a thread more than once.
       * 不能兩次啟動執行緒,否則報IllegalThreadStateException異常
       * In particular, a thread may not be restarted once it has completed
       * execution.
       * 一個執行緒生命週期結束,也就是到了TERMINATED狀態,再次呼叫start方法是不允許的,
       * 也就是TERMINATED狀態沒法回到RUNNABLE/RUNNING狀態。
       *
       * @exception  IllegalThreadStateException  if the thread was already
       *               started.
       * @see        #run()
       * @see        #stop()
       */
      public synchronized void start() {//執行緒安全的
          /**
           * This method is not invoked for the main method thread or "system"
           * group threads created/set up by the VM. Any new functionality added
           * to this method in the future may have to also be added to the VM.
           * 這個方法不會被主執行緒呼叫或通過虛擬機器系統執行緒組建立起來。未來任何新增到該方法裡的新功能可能需要加入到虛擬機器中
           *
           * A zero status value corresponds to state "NEW".
      * 執行緒被構造後的new狀態,threadStatus的屬性值是0 */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented.
      * 通知執行緒組新執行緒將要啟動,以便它可以新增到執行緒組列表並且執行緒組沒有開始計數*/ group.add(this);//加入執行緒組 boolean started = false; try { start0();//呼叫本地方法 started = true; } finally { try { if (!started) {//啟動失敗 group.threadStartFailed(this);//執行緒啟動失敗,從組中移除該執行緒 } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
      void add(Thread t) {
      synchronized (this) {
      if (destroyed) {//執行緒組狀態校驗
      throw new IllegalThreadStateException();
      }
      if (threads == null) {
      threads = new Thread[4];//初始化長度為4的執行緒組
      } else if (nthreads == threads.length) {
      threads = Arrays.copyOf(threads, nthreads * 2);//陣列滿了就擴容2倍
      }
      threads[nthreads] = t;//當前執行緒新增到執行緒組中

      // This is done last so it doesn't matter in case the
      // thread is killed
      nthreads++;//執行緒數+1

      // The thread is now a fully fledged member of the group, even
      // though it may, or may not, have been started yet. It will prevent
      // the group from being destroyed so the unstarted Threads count is
      // decremented.
      nUnstartedThreads--;//未啟動執行緒數-1
      }
      }
      private native void start0();//本地方法呼叫重寫的run方法
      void threadStartFailed(Thread t) {
      synchronized(this) {
      remove(t);//移除當前執行緒
      nUnstartedThreads++;//沒有啟動的執行緒數量+1
      }
      }

      //=======================測試============================
      Thread t = new Thread(){
        @Override
        public void run(){
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
      };
      t.start();
      t.start();//不能兩次啟動,第二次啟動是不允許的,報IllegalThreadStateException,此時該執行緒是處於執行狀態
      //=======================測試============================== Thread t = new Thread(){ @Override public void run(){ try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); TimeUnit.SECONDS.sleep(10);//設定休眠時間,上面的執行緒的生命週期已經終止,下面再次啟動報IllegalThreadStateException t.start();
    • 實現Runnable介面,重寫run方法並且將Runnable例項用作構造Thread的引數【單繼承有侷限性,推薦使用介面】:將執行緒的控制(start)和業務邏輯(run)的執行徹底分離開來,使用的是策略模式;Thread的run方法是不能共享的,但Runnbale的run方法可以共享,使用同一個Runnable的例項構造不同的Thread例項;把實現類物件(實現Runnable介面的類的例項化)放入代理類物件(Thread構造方法)中,使用的是代理模式;下面是靜態代理的程式碼解釋:

      public class StaticProxy {
          public static void main(String[] args) {
              new Weeding(new Me()).happyMarry();
      //        new Thread(物件).start();類似
          }
      }
      
      interface Marry {
          void happyMarry();
      }
      //真實角色
      class Me implements Marry {
          @Override
          public void happyMarry() {
              System.out.println("me will marry!");
          }
      }
      //代理物件
      class Weeding implements Marry{
          //真實角色
          private Marry marry;
          public Weeding(Marry marry){
              this.marry=marry;
          }
          @Override
          public void happyMarry() {
              System.out.println("start");
              marry.happyMarry();
              System.out.println("end");
          }
      }
  • 實現Callable介面,重寫call方法,Future獲取返回值:Callable能接受一個泛型,然後在call方法中返回一個指定型別的值;

    public interface Callable<V> {
        V call() throws Exception;
    } 
    //執行緒池佇列開啟執行緒,不會產生髒讀資料
    //使用步驟:
    //1.建立目標物件new
    //2.建立執行服務執行緒池
    //3.提交執行submit
    //4.獲取結構get
    //5.關閉服務shutdownNow
    public class MyThread implements Callable {
        private static int count = 20;
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            MyThread m1 = new MyThread();
            MyThread m2 = new MyThread();
            MyThread m3 = new MyThread();
            MyThread m4 = new MyThread();
            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2,
                    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
            Future submit = service.submit(m1);
            Future submit1 = service.submit(m2);
            Future submit2 = service.submit(m3);
            Future submit3 = service.submit(m4);
            System.out.println(submit.get());
            System.out.println(submit1.get());
            System.out.println(submit2.get());
            System.out.println(submit3.get());
            service.shutdown();
    
        }
    
        @Override
        public Object call() throws Exception {
            count--;
            return count;
        }
    }
  • 匿名內部類;

            new Thread(){//相當於繼承Thread的方式
                public void run(){
                    System.out.println("thread1 start ... ");
                }
            }.start();
    
    
            new Thread(new Runnable() {//相當於實現Runnable介面的方式
                @Override
                public void run() {
                    System.out.println("thread2 start .... ");
                }
            }).start();
  • 定時器(Timer);

            Timer timer = new Timer();//建立時間器
            timer.schedule(new TimerTask() {//使用schedule,引數為定時器任務並重寫run方法
                @Override
                public void run() {
                    System.out.println("timer task is run");
                }
            }, 0, 1000);
  • 執行緒池(內部使用佇列,所以加入執行緒池的執行緒是順序執行):使用execute和重寫Runnbale的run方法;

    ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2,
                    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
            service.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run test");
                }
            });
  • lambda表示式;

            new Thread(()-> {
                for(int i = 1 ; i<10 ; i++){
                    System.out.println("It is a lambda function!");
                }
    
            }).start();
  • Spring方式(@Async註解);

        @Test
        public void test() {
            run();
        }
        @Async
        public void run(){
            System.out.println("Async Test");
        }

二.執行緒生命週期

  • new新生狀態:當用new建立一個Thread物件時,此時它並不處於執行狀態,因為沒有呼叫star啟動該執行緒,那麼執行緒的狀態為new狀態,也就是說,它只是Thread物件的狀態,在沒有start之前,該執行緒是不存在的;

  • runnable就緒狀態:執行緒物件進入runnable就緒狀態必須呼叫start方法,那麼此時才是真正地在JVM程序中建立了一個執行緒;就緒狀態不會直接進入阻塞狀態和死亡狀態,即使是線上程的執行邏輯中呼叫wait、sleep或其他block的IO操作等,也必須先獲得CPU的排程執行權才可以,嚴格來說,就緒狀態的執行緒只能意外終止或進入執行狀態;

  • running執行狀態:一旦CPU通過輪詢或其他方式從任務可執行佇列中選中了執行緒,此時它才能真正地執行自己的邏輯程式碼;一個正在running狀態的執行緒事實上也是一個runnable的,但是反過來則不成立;

 

  • sleep:使當前執行緒進入指定毫秒級的休眠,暫停執行,但不會放棄monitor鎖的所有權,即不會釋放鎖資源;使用TimeUnit來替代Thread.sleep,省去了時間單位的換算步驟;

  • yield:屬於一種啟發式的方法,其會提醒排程器我願意放棄當前的CPU資源,如果CPU的資源不緊張,則會忽略這種提醒;yield只是一個提示(hint),CPU排程器並不會擔保每次都能滿足yield提示;

  • sleep和yield的區別:

    • sleep會導致當前執行緒暫停指定的時間,沒有CPU時間片的消耗;

    • yield只是對CPU排程器的一個提示,如果CPU排程器沒有忽略這個提示,它會導致執行緒上下文的切換;

    • sleep會使執行緒短暫block,會在給定的時間內釋放CPU資源;

    • yield會使running狀態的執行緒進入runnable狀態(如果CPU排程器沒有忽略這個提示的話);

    • sleep幾乎百分之百地完成了給定時間的休眠,但yield的提示並不能一定擔保;

    • 一個執行緒sleep另一個執行緒interrupt會捕獲到中斷訊號,而yield則不會;

  • join:join某個執行緒A,會使當前執行緒B進入等待,直到執行緒A結束生命週期;可以使用join來達到執行緒順序執行的效果;

  • wait:表示執行緒一直等待,直到其他執行緒通知,與sleep不同的是它會釋放鎖;呼叫wait會加入wait set中;

  • notify:喚醒一個處於等待狀態的執行緒;

  • notifyAll:喚醒同一個物件上所有呼叫wait方法的執行緒,優先順序高的執行緒優先排程;

  • synchronized:同步,內建鎖、互斥鎖,鎖定共享資源,JVM指令是monitor enter和monitor exit;synchronized的指令嚴格遵守java happens-before規則,一個monitor exit指令之前必定要有一個monitor enter;

    • 鎖資訊存在物件頭中:

      • Mark Word

        • 執行緒id

        • Epoch

        • 物件的分代年齡資訊

        • 是否是偏向鎖

        • 鎖標誌位

      • Class Metadata Address