1. 程式人生 > >《Java並發編程實踐》筆記4——線程池高級

《Java並發編程實踐》筆記4——線程池高級

iss 資源 有一個 RM mic getclass term 泄露 bsp

作者:chjttony

1.估算線程池最優大小:

Ncpu = CPU的數量 = Runtime.getRuntime().availableProcessors();

Ucpu = 目標CPU的使用率, 0 <= Ucpu <= 1;

W/C = 等待時間與計算時間的比率;

為了保持處理器達到期望的使用率,最優的線程池大小等於:

Nthreads = Ncpu * Ucpu * (1+ W/C);

2.配置ThreadPoolExecutor:

ThreadPoolExecutor是Executors中工廠方法newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor返回的ExecutorService接口的基本實現,提供了默認的執行策略。

ThreadPoolExecutor還允許通過構造方法定制執行策略,常用構造方法如下:

public ThreadPoolExecutor(int corePoolSize,   
                            int maximumPoolSize,   
                            long keepAliveTime,  
                            TimeUnit unit,   
                            BlockingQueue<Runnable> workQueue,  
                            ThreadFactory threadFactory,      
                            RejectedExecutionHandler handler){......}  

ThreadPoolExecutor配置執行策略的參數如下:

(1).核心池大小:

核心池大小是目標線程池大小,線程池實現試圖維護池的大小,即使沒有任務執行,池的大小也等於核心池的大小,並且直到工作隊列充滿前池都不會創建更多的線程。

如果核心池大小設置為0,則在工作隊列填滿之前線程池不會開始工作;若有一個有限的線程池和一個有限的工作隊列,同時又希望所有的線程在沒有任務的情況下銷毀,則可以將核心線程池大小設置為0來激活這個特性,newCachedThreadPool的核心池大小就是0。

(2).最大池大小:

最大池大小是可同時活動的線程數的上限,若空閑時,會銷毀超出核心池大小的多余線程。

newCachedThreadPool將最大池設置為Integer.MAX_VALUE。

(3).存活時間:

即空閑線程超時時間,若一個線程已經閑置的時間超過了存活時間,它將成為一個被回收的候選者,若當前的池大小已經超過了核心池大小,這個線程會被終止掉。

newFixedThreadPool為請求的線程池設置了核心池和最大池的大小,而且存活時間永遠不會超時。

newCachedThreadPool默認的超時時間為1分鐘。

(4).任務阻塞隊列:

當請求到達速度超了線程池處理速度時,線程池將尚未開始執行的任務放入任務阻塞隊列中等待。

任務阻塞隊列通常有以下3中:

A.無限隊列:

newFixedThreadPool和newSingleThreadExecutor默認使用一個無限的LinkedBlockingQueue阻塞隊列,若所有工作者線程都處於忙碌,任務將會在隊列中等候,若任務持續快速地到達,隊列也會無限制地增加。

B.有限隊列:

為了避免資源被耗盡,線程池也經常使用有限隊列,如ArrayBlockingQueue或有界的LinkedBlockingQueue;當隊列滿時,新來的任務會使用飽和策略處理。

C.同步移交:

同步移交完全繞開隊列,直接將任務從生產者移交給工作者線程。

同步移交適合線程池無限或者可以接受任務被拒絕的情況,newCachedThreadPool就使用同步移交方式。

註意:只有任務彼此獨立時,使用有限線程池或有限工作隊列才是合理的,若任務之間相互依賴,有限的線程池或工作隊列就可能引起線程的饑餓死鎖,而使用無限的線程池配置(newCachedThreadPool)可以避免任務相互依賴引起的線程饑餓死鎖。

(5).飽和策略:

飽和策略用於規定有界隊列充滿或線程池被關閉時對新提交任務的處理方案。

JDK提供了如下4中飽和策略:

A.中止(abort):

默認的飽和策略,會拋出未檢查的拒絕執行異常RejectedExecutionException,調用者捕獲該異常做適當處理。

B.遺棄(discard):

丟棄最新提交的任務。

C.遺棄最舊的(discard-oldest):

丟棄本應該接下來就要執行的任務,並嘗試去重新提交新任務,若是優先級隊列,則丟棄優先級最高的任務,因此不能混合使用遺棄最舊的飽和策略和優先級隊列。

D.調用者運行(caller-runs):

既不會丟最新提交的任務,也不會拋出異常,會把最新提交的任務推回到調用者,由生產者線程執行,一方面給工作者線程時間來追趕進度,另一方面減慢了生產者提交新任務的速度。

通過調節核心池大小和存活時間,可以促進線程池歸還空閑線程占用的資源,飽和策略用於應對過載處理。

3.定制線程工廠:

線程池通過線程工廠來創建線程,默認的線程池工廠會創建一個新的,非後臺的線程,沒有特殊配置。

線程池工廠可以運行定制線程的配置信息,例如:為線程池指定UncaughtExceptionHandler,用於捕獲線程拋出的未檢查異常以防止線程泄露;實例化一個定制的線程類實例,用來執行調試日誌線程;給線程指定名稱等等。

線程池工廠只有一個newThread方法,在線程池需要創建一個新線程時使用,例子代碼如下:

public class MyThreadFactory implements ThreadFactory{  
    private final String poolName;  
      
    public MyThreadFactory(String poolName){  
        this.poolName = poolName;  
    }  
      
    public Thread newThread(Runnable runnable){  
        return new MyAppThread(runnable, poolName);  
    }  
}  
  
public class MyAppThread extends Thread{  
    public static final String DEFAULT_NAME = “MyAppThread”;  
    private static volatile boolean debug = false;  
    private static final AtomicInteger created = new AtomicInteger();  
    private static final AtomicInteger alive = new AtomicInteger();  
    private static final Logger log = Logger.getLogger(MyAppThread.class.getClassName());  
      
    public MyAppThread(Runnable r){  
        this(r, DEFAULT_NAME);  
    }  
  
    public MyAppThread(Runnable r, String name){  
        super(runnable, name + “-” + created.incrementAndGet());  
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){  
            public void uncaughtException(Thread t, Throwable e){  
                log.log(Level.SEVER, “Uncaught in thread ” + t.getName(), e);  
            }  
        });  
    }  
      
    public void run(){  
        if(debug){  
            log.log(Level.FINE, “Created ” + t.getName());  
        }  
        try{  
            alive.incrementAndGet();  
            super.run();  
        }finally{  
            alive.decrementAndGet();  
            if(debug){  
                log.log(Level.FINE, “Exiting ” + t.getName());  
            }  
        }  
    }  
  
    public static int getThreadsCreated(){  
        return created.get();  
    }  
  
    public static int getThreadsAlive(){  
        return alive.get();  
    }  
  
    public static boolean isDebug(){  
            return debug;  
    }  
  
    public static void setDebug(boolean debug){  
        this.debug = debug;  
    }  
}  

4.擴展ThreadPoolExecutor:

ThreadPoolExecutor提供了以下3個生命周期的鉤子方法讓子類擴展:

(1).beforeExecute:

任務執行前,線程會調用該方法,可以用來添加日誌、監控或者信息收集統計。

若beforeExcute方法拋出了RuntimeException,線程的任務將不被執行,afterExecute方法也不會被調用。

(2).afterExecute:

任務執行結束後,線程會調用該方法,可以用來添加日誌、監控或者信息收集統計。

無論任務正常返回或者拋出異常(拋出Error不能被調用),該方法都會被調用。

(3).terminate:

線程池完成關閉動作後調用,可以用來釋放資源、發出通知、記錄日誌或者完成統計信息等。

一個擴展ThreadPoolExecutor的例子代碼如下:

public class TimingThreadPool extends ThreadPoolExecutor{  
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();  
    private final Logger log = Logger.getLogger(TimingThreadPool.class.getClassName());  
    private final AtomicLong numTasks = new AtomicLong();  
    private final AtomicLong totalTime = new AtomicLong();  
  
    protected void beforeExecute(Thread t, Runnable r){  
        super.beforeExecute(t, r);  
        log.fine(String.format(“Thread %s: start %s”, t, r));  
        startTime.set(System.nanoTime());  
    }  
  
    protected void afterExecute(Runnable r, Throwable t){  
        try{  
            long endTime = System.nanoTime();  
            long taskTime = endTime - startTime.get();  
            numTasks.incrementAndGet();  
            totalTime.addAndGet(taskTime);  
            log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));  
        }finally{  
            super.afterExecute(r, t);  
        }  
    }  
  
    protected void terminated(){  
        try{  
            log.info(String.format("Terminated: avg time=%dns",   
                    totalTime.get() / numTasks.get()));  
        }finally{  
            super.terminated();  
        }  
    }  
}  

5.GUI單線程化:

幾乎所有的GUI工具集都實現為單線程化子系統,即所有GUI的活動都被限制在一個單獨的線程中。早期的GUI應用程序GUI事件在主事件循環中進行處理;現代的GUI框架使用了一個專門的事件派發線程(EDT)來處理GUI事件。

GUI被設計為單線程化的原因為:

(1).用戶與應用程序的事件存在鎖順序死鎖問題:

GUI程序中,事件是以冒泡方式進行傳遞的,以用戶點擊修改界面組件背景顏色為例:

A.用戶發起的動作以冒泡方式從操作系統傳遞給應用程序:

首先,操作系統檢測到一次鼠標點擊;

其次,操作系統工具集把它轉化為鼠標點擊事件;

最後,操作系統將點擊事件轉發給應用程序GUI對象的監聽器;


技術分享圖片

B.應用程序發起的動作會以冒泡方式傳回操作系統:

首先,應用程序GUI監聽器發起改變組件背景顏色動作;

其次,GUI工具集把事件動作轉發給特定組件類;

最後,組件類把發事件轉發給操作系統進行渲染;

技術分享圖片

上述兩種動作以完全相反的順訪問相同的GUI對象,為了保證每個對象的線程安全,若每一層都使用一個鎖加鎖的話,這一系列的鎖順序不一致會導致鎖順序死鎖問題,而如果每一層都共用一個鎖的話,那就跟單線程沒區別了。

(2).MVC模式的死鎖問題:

目前MVC(模型-視圖-控制器)模式在GUI開發中普遍流行,它把用戶的交互分撥到模型、視圖和控制器之間的協作中,極大地簡化了GUI應用程序的實現,但是也再次面臨不一致的鎖順序死鎖問題:

控制器調用模型,模型通知視圖已經發生了一些事情:

技術分享圖片

控制器同樣可以調用視圖,視圖調用模型來查詢模型狀態:

技術分享圖片

鎖順序死鎖問題讓多線程的GUI程序極不穩定,bug眾多且難以調試,因此GUI工具集都是單線程化。

《Java並發編程實踐》筆記4——線程池高級