1. 程式人生 > >實戰Java高併發程式設計(3.2 執行緒池)

實戰Java高併發程式設計(3.2 執行緒池)

1.Executor

jdk提供了一套Executor框架,本質上是一個執行緒池。

  1. newFixedThreadPool()方法:該方法返回一個固定數量的執行緒池。該執行緒池中的執行緒數量始終不變,當有一個新任務提交時,執行緒池中若有空閒執行緒,則立即執行,若沒有,則任務會暫存在一個任務佇列中,待有執行緒空閒時,便處理在任務佇列中的任務。
  2. newSingleThreadExecutor():該方法返回一個只有一個執行緒的執行緒池。若有多餘一個任務被提交到該執行緒池,任務會被儲存到一個任務佇列中,待執行緒空閒,按先入先出的順序執行佇列中的任務。
  3. newCachedThreadPool():該方法返回一個可根據實際情況調整執行緒數量的執行緒池。執行緒池的執行緒數量不確定,但若有空閒執行緒可以複用,則會優先使用可複用的執行緒。若所有執行緒均在工作,又有新的任務提交,則會建立新的執行緒處理任務。所有執行緒在使用完畢後,將返回執行緒池進行復用。
  4. newSingleThreadScheduledExecutor():該方法返回一個ScheduledExecutorService物件,執行緒池大小為1。ScheduledExecutorService介面在ExecutorService介面之上擴充套件了在給定時間執行某任務的功能,如在某個固定的延時之後執行,或者週期性的執行某個任務。
  5. newScheduledThreadPool():該方法也返回一個ScheduledExecutorService物件,但該執行緒池可以指定執行緒數量。 
public class Test {
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            es.submit(myTask);
        }
    }
}

       

2.拒絕策略     

當任務數量超過系統實際承載能力時,用拒絕策略進行補救。

  1. AbortPolicy():該策略會直接丟擲異常,阻止系統正常工作。
  2. CallerRunsPolicy():只要執行緒池未關閉,該策略直接在呼叫者的執行緒中,運行當前被丟棄的任務。
  3. DiscardOldestPolicy():該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試在次提交當前任務.
  4. DiscardPolicy():該策略默默地丟棄無法處理的任務,不予任何處理。
public class Test {
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException{
        MyTask myTask = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L,
                TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+" is discard");
            }
        });
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(myTask);
            Thread.sleep(10);
        }
    }
}

                                                                        在執行幾個任務後,拒絕策略就開始生效了。在實際應用中,可以利用這一點來分析系統的負載和任務丟失情況。

3.擴充套件執行緒池      

 ThreadPoolExecutor是一個可擴充套件的執行緒池。

public class Test {
    public static class MyTask implements Runnable {

        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task name = " + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L,
                TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行:" + ((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r,Throwable t){
                System.out.println("執行完成:"+((MyTask) r).name);
            }

            @Override
            protected void terminated(){
                System.out.println("執行緒退出");
            }
        };
        for (int i = 0; i < 5; i++) {
            MyTask myTask = new MyTask("TASK-GEYM-"+i);
            es.execute(myTask);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

       

所有任務的前中後時間點以及任務名字都可以捕捉。便於應用程式除錯。

4.線上程池中捕獲異常   

改造ThreadPoolExecutor方法,使其能丟擲異常,以便於查詢錯誤的地方。

public class Test extends ThreadPoolExecutor {

    public Test(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }

    private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw e;
                }
            }
        };
    }

    public static void main(String[] args) {
        ThreadPoolExecutor pools = new Test(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS,
                new SynchronousQueue<>());
        for (int i = 0; i < 5; i++) {
            pools.execute(new DivTask(100,i));
        }
    }

    private static class DivTask implements Runnable {
        int a, b;

        public DivTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            double re = a/b;
            System.out.println(re);
        }
    }
}

    

5.Fork/Join框架 

假如要處理1000個數據,但不具備同時處理的能力,先處理10個,在分階段處理100次,最後將結果合併。某一執行緒處理完後可以幫助其他未完成的執行緒處理。

public class Test extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public Test(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        boolean canConpute = (end - start) < THRESHOLD;
        if (canConpute) {
            for (long i = start; i < end; i++) {
                sum += i;
            }
        }else {
//            分成100個任務
            long step = (start+end)/100;
            ArrayList<Test> subTasks = new ArrayList<>();
            long pos = start;
            for (int i = 0; i < 100; i++) {
                long lastOne = pos+step;
                if (lastOne>end){
                    lastOne = end;
                }
                Test subTask = new Test(pos,lastOne);
                pos+=step+1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for (Test t : subTasks){
                sum+=t.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Test test = new Test(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(test);
        try {
            long res = result.get();
            System.out.println("sum = "+res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}