1. 程式人生 > >多執行緒(一)

多執行緒(一)

多執行緒是指的是這個程式(一個程序)執行時產生了不止一個執行緒。 並行與併發區別: 並行:多個cpu例項或者多臺機器同時執行一段處理邏輯,是真正的同時。 併發:通過cpu排程演算法,讓使用者看上去同時執行,實際上從cpu操作層面不是真正的同時。併發往往在場景中有公用的資源,那麼針對這個公用的資源往往產生瓶頸,我們會用TPS或者QPS來反應這個系統的處理能力。

Java執行緒具有五種基本狀態

新建狀態(New):當執行緒物件對建立後,即進入了新建狀態,如:Thread t = new MyThread(); 就緒狀態(Runnable):當呼叫執行緒物件的start()方法(t.start();),執行緒即進入就緒狀態。處於就緒狀態的執行緒,只是說明此執行緒已經做好了準備,隨時等待CPU排程執行,並不是說執行了t.start()此執行緒立即就會執行; 執行狀態(Running):當CPU開始排程處於就緒狀態的執行緒時,此時執行緒才得以真正執行,即進入到執行狀態。注:就緒狀態是進入到執行狀態的唯一入口,也就是說,執行緒要想進入執行狀態執行,首先必須處於就緒狀態中; 阻塞狀態(Blocked):處於執行狀態中的執行緒由於某種原因,暫時放棄對CPU的使用權,停止執行,此時進入阻塞狀態,直到其進入到就緒狀態,才有機會再次被CPU呼叫以進入到執行狀態。根據阻塞產生的原因不同,阻塞狀態又可以分為三種: 1.等待阻塞:執行狀態中的執行緒執行wait()方法,使本執行緒進入到等待阻塞狀態。 2.同步阻塞 -- 執行緒在獲取synchronized同步鎖失敗(因為鎖被其它執行緒所佔用),它會進入同步阻塞狀態; 3.其他阻塞 -- 通過呼叫執行緒的sleep()或join()或發出了I/O請求時,執行緒會進入到阻塞狀態。當sleep()狀態超時、join()等待執行緒終止或者超時、或者I/O處理完畢時,執行緒重新轉入就緒狀態。 死亡狀態(Dead):執行緒執行完了或者因異常退出了run()方法,該執行緒結束生命週期。

執行緒實現的兩種方式: 1、繼承Thread類

2、實現Runnable介面

public class Thread1 extends Thread {
private String name;

public Thread1(String name) {
this.name = name;
}

public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "執行 : " + i);
}
}

public static void main(String[] args) {
  Thread1 mTh1 = new Thread1("A");
  Thread1 mTh2 = new Thread1("B");
  mTh1.start();
  mTh2.start();
}

}
public class Thread2 implements Runnable{
private String name;
 
public Thread2(String name) {
this.name=name;
}
 
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "執行 : " + i);
try {
Thread.sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
 
}
public static void main(String args[]) {
 
  Thread2 th1 =new Thread2("A");
  new Thread(th1).start();
  Thread2 th2 =new Thread2("B");
  new Thread(th2).start();
}
}

PS:Runnable沒有start()方法,所以需要new Thread來進行實現。Thread類也是實現Runnable介面

join()使用方法

public class Thread1 extends Thread {
    private String name;

    public Thread1(String name) {
        this.name = name;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + "執行緒執行開始!");
        for (int i = 0; i < 5; i++) {
            System.out.println("子執行緒" + name + "執行 : " + i);
            if (i == 2) {
                yield();
            }
        }
        System.out.println(Thread.currentThread().getName() + "執行緒執行結束!");
    }

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + "主執行緒執行開始!");
        Thread1 mTh1 = new Thread1("A");
        Thread1 mTh2 = new Thread1("B");
        mTh1.start();
        mTh2.start();
        try {
            mTh1.join();//其他執行緒等待該執行緒執行完畢
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            mTh2.join(); //其他執行緒等待該執行緒執行完畢
} catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "主執行緒執行結束!"); } }

ps: join()的作用是“等待該執行緒終止”,是指的主執行緒等待子執行緒的終止。子執行緒在呼叫了join()方法後面的程式碼,只有等到子執行緒結束了才能執行。不使用join()方法,主執行緒會比子執行緒結束。

執行緒池:ExecutorService 

ExecutorService 執行緒池是為解決迴圈new Thread()的問題,提供定時執行、定期執行、單執行緒、併發數控制等功能。

ExecutorService 提供了四種方法。

newCachedThreadPool是建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。

執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

public static void main(String args[]) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            cachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println(index);
                }
            });
        }
    }

newFixedThreadPool是建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。所以每一次併發執行3個執行緒。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }
    }

newScheduledThreadPool建立一個定長執行緒池,支援定時及週期性任務執行。表示延遲3秒執行。

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        System.out.println("delay 3 seconds");
        for (int i = 0; i < 10; i++) {
            final int index = i;
            scheduledThreadPool.schedule(new Runnable() {

                @Override
                public void run() {
                    System.out.println(index);

                }
            }, 3, TimeUnit.SECONDS);
        }
    }
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        System.out.println("delay 3 seconds");
        for (int i = 0; i < 10; i++) {
            final int index = i;
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    System.out.println(index);

                }
            }, 1, 3, TimeUnit.SECONDS);
        }
    }

表示延遲1秒後每3秒執行一次。

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {

                @Override
                public void run() {
                        System.out.println(index);
                   
                }
            });
        }
    }

Callable、Future和FutureTask

Callable是介面,能夠返回執行結果,Future是介面,FutureTask實現方法。

public class FutureExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Date date1 = new Date();
        //同時可以併發執行5個執行緒
        ExecutorService pool = Executors.newFixedThreadPool(5);
        //建立結果集
        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < 100; i++) {
            Callable c = new MyCallable(i + " ");
            //執行執行緒提交
            Future f = pool.submit(c);
            list.add(f);
        }
        pool.shutdown();
        //將結果放回list,然後遍歷
        for (Future f : list) {
            System.out.println(">>>" + f.get().toString());
        }

        Date date2 = new Date();
        System.out.println((date2.getTime() - date1.getTime()) );
    }
}

class MyCallable implements Callable<Object> {
    private String taskNum;

    MyCallable(String taskNum) {
        this.taskNum = taskNum;
    }

    public Object call() throws Exception {
        System.out.println(">>>" + taskNum);
        Date dateTmp1 = new Date();
        Thread.sleep(1000);
        Date dateTmp2 = new Date();
        long time = dateTmp2.getTime() - dateTmp1.getTime();
        System.out.println(">>>" + taskNum);
        return taskNum + time;
    }
}
public class FutureExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Date date1 = new Date();
        //同時可以併發執行5個執行緒
        ExecutorService pool = Executors.newFixedThreadPool(5);
        //建立結果集
//        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < 100; i++) {
            Callable c = new MyCallable(i + " ");
            //執行執行緒提交
            Future f = pool.submit(c);
            System.out.println(">>>" + f.get().toString());//返回結果線上程裡面呼叫
        }
        pool.shutdown();
//        //將結果放回list,然後遍歷
//        for (Future f : list) {
//            System.out.println(">>>" + f.get().toString());
//        }

        Date date2 = new Date();
        System.out.println((date2.getTime() - date1.getTime()) );
    }
}

class MyCallable implements Callable<Object> {
    private String taskNum;

    MyCallable(String taskNum) {
        this.taskNum = taskNum;
    }

    public Object call() throws Exception {
        System.out.println(">>>" + taskNum);
        Date dateTmp1 = new Date();
        Thread.sleep(1000);
        Date dateTmp2 = new Date();
        long time = dateTmp2.getTime() - dateTmp1.getTime();
        System.out.println(">>>" + taskNum);
        return taskNum + time;
    }
}
public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Date date1 = new Date();
        //同時可以併發執行5個執行緒
        ExecutorService pool = Executors.newFixedThreadPool(100);
        //建立結果集
//        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < 100; i++) {
            Callable c = new MyCallable(i + " ");
            //執行執行緒提交
//            Future f = pool.submit(c);
             //建立FutureTask  
            FutureTask<Object> futureTask=new FutureTask<Object>(c);  
            pool.submit(futureTask);
            System.out.println(">>>" + futureTask.get().toString());//返回結果線上程裡面呼叫
        }
        pool.shutdown();
//        //將結果放回list,然後遍歷
//        for (Future f : list) {
//            System.out.println(">>>" + f.get().toString());
//        }

        Date date2 = new Date();
        System.out.println((date2.getTime() - date1.getTime()) );
    }
}

class MyCallable implements Callable<Object> {
    private String taskNum;

    MyCallable(String taskNum) {
        this.taskNum = taskNum;
    }

    public Object call() throws Exception {
        System.out.println(">>>" + taskNum);
        Date dateTmp1 = new Date();
        Thread.sleep(1000);
        Date dateTmp2 = new Date();
        long time = dateTmp2.getTime() - dateTmp1.getTime();
        System.out.println(">>>" + taskNum);
        return taskNum + time;
    }
}