Android開發經驗談:併發程式設計(執行緒與執行緒池)
一、執行緒
在Android開發中,你不可能都在主執行緒中開發,畢竟要聯網,下載資料,儲存資料等操作,當然這就離不開執行緒。
(當然你可以在Android4.0以前的手機裡在主執行緒請求網路,我最早開發的時候,用的手機比較古老。。。)
在Android中你可以隨意建立執行緒,於是就會造成執行緒不可控,記憶體洩漏,建立執行緒消耗資源,執行緒太多了消耗資源等問題。
具體執行緒怎麼建立我就不在文章裡描述了,畢竟這主要將併發程式設計。。。。
大家知道執行緒不可控就好了。。。於是就需要對執行緒進行控制,防止一系列問題出現,這就用到了如下要講的東西。
二、執行緒池
執行緒池:顧名思義,就是放執行緒的大池子。
如何建立一個執行緒池?
先說說幾個系統的執行緒池:
- FixedThreadPool 建立定長執行緒的執行緒池
- CachedThreadPool 需要的時候建立新的執行緒,超時執行緒銷燬
- SingleThreadPool 單個執行緒的執行緒池
- ScheduledThreadPool 可以定時的執行緒池,建立週期性的任務
這幾個執行緒池不做多餘闡述,因為這些執行緒池的原理都與我下面要講的有關。。。。
···························································································
如何自定義執行緒池(先來了解幾個必須知道的引數):
corePoolSize:
核心執行緒池大小,執行緒池中主要工作的執行緒的多少。
maximumPoolSize:
執行緒池最大執行緒數。
keepAliveTime:
空閒執行緒可保持的時間是多久,如果你啟用了allowCoreThreadTimeOut方法,你的執行緒池裡的空閒執行緒在這個時間段後會自動銷燬,如果沒啟用,則只要不超過corePoolSize,空閒執行緒也不會銷燬。
Unit:
keepAliveTime的時間單位
workQueue:
阻塞佇列,當任務達到corePoolSize,就會被放入這個佇列
常見幾種BlockingQueue實現
- ArrayBlockingQueue : 有界的陣列佇列
- LinkedBlockingQueue : 可支援有界/無界的佇列,使用連結串列實現
- PriorityBlockingQueue : 優先佇列,可以針對任務排序
- SynchronousQueue : 佇列長度為1的佇列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連線上來poll task。
threadFactory:
執行緒工廠,主要用來建立執行緒;
handler:
表示當拒絕處理任務時的策略,也就是引數maximumPoolSize達到後丟棄處理的方法。有以下四種取值:
ThreadPoolExecutor.AbortPolicy :丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy :也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy :丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy :由呼叫執行緒處理該任務
使用者也可以實現介面RejectedExecutionHandler定製自己的策略。
程式碼展示:
//執行緒工廠 public class TaskThreadFactoryimplements ThreadFactory { private final AtomicInteger mThreadNumber = new AtomicInteger(1); private final String mNamePrefix; TaskThreadFactory(String name) { mNamePrefix = name + "#"; } public Thread newThread(Runnable r) { Thread t = new Thread(r,mNamePrefix + mThreadNumber.getAndIncrement()); //if (t.isDaemon()) //t.setDaemon(false); // //if (t.getPriority() != Thread.NORM_PRIORITY) //t.setPriority(Thread.NORM_PRIORITY); return t; } } //重寫runnable public class PRunnable implements Runnable { public static final int HIGH = 1;//優先順序高 public static final int NORMAL = 2;//優先順序中等 public static final int LOW = 3;//優先順序低 @IntDef({HIGH,NORMAL,LOW}) @Retention(RetentionPolicy.SOURCE) public @interface Priority{} public final int priority; private final Runnable runnable; public int serial; public PRunnable(Runnable runnable){ this(NORMAL,runnable); } public PRunnable(@Priority int priority,Runnable runnable){ this.priority = priority; this.runnable = runnable; } @Override public void run() { if (runnable != null) { runnable.run(); } } /** * 執行緒佇列方式 先進先出 * @param r1 * @param r2 * @return */ public static final int compareFIFO(PRunnable r1, PRunnable r2) { int result = r1.priority-r2.priority; return result==0?r1.serial-r2.serial:result; } /** * 執行緒佇列方式 後進先出 * @param r1 * @param r2 * @return */ public static final int compareLIFO(PRunnable r1, PRunnable r2) { int result = r1.priority-r2.priority; return result==0?r2.serial-r1.serial:result; } } //執行緒池實現 public class TaskExecutor implements Executor { private final static int QUEUE_INIT_CAPACITY = 20; private static final int CORE = 3; private static final int MAX = 5; private static final int TIMEOUT = 30 * 1000; private AtomicInteger SERIAL = new AtomicInteger(0);//主要獲取新增任務 public static class Config { public int core; public int max; public int timeout; public boolean allowCoreTimeOut; public boolean fifo; public Config(int core, int max, int timeout, boolean allowCoreTimeOut,boolean fifo) { this.core = core; this.max = max; this.timeout = timeout; this.allowCoreTimeOut = allowCoreTimeOut; this.fifo = fifo; } } public static Config defaultConfig = new Config(CORE, MAX, TIMEOUT, true,true); private final String name; private final Config config; private ExecutorService service; public TaskExecutor(String name) { this(name, defaultConfig); } public TaskExecutor(String name, Config config) { this(name, config, true); } public TaskExecutor(String name, Config config, boolean startup) { this.name = name; this.config = config; if (startup) { startup(); } } public void startup() { synchronized (this) { if (service != null && !service.isShutdown()) { return; } service = createExecutor(config); } } public void shutdown() { ExecutorService executor = null; synchronized (this) { // 交換變數 if (service != null) { executor = service; service = null; } } if (executor != null) { // 停止執行緒 if (!executor.isShutdown()) { executor.shutdown(); } // 回收變數 executor = null; } } private void executeRunnable(PRunnable runnable) { synchronized (this) { if (service == null || service.isShutdown()) { return; } runnable.serial = SERIAL.getAndIncrement(); service.execute(runnable); } } @Override public void execute(Runnable runnable) { if (runnable instanceof PRunnable) { executeRunnable((PRunnable) runnable); }else{ executeRunnable(new PRunnable(runnable)); } } public Future<?> submit(Runnable runnable) { synchronized (this) { if (service == null || service.isShutdown()) { return null; } if (runnable instanceof PRunnable) { ((PRunnable) runnable).serial = SERIAL.getAndIncrement(); return service.submit(runnable); }else{ PRunnable pRunnable = new PRunnable(runnable); pRunnable.serial = SERIAL.getAndIncrement(); return service.submit(pRunnable); } } } public void execute(Runnable runnable, @PRunnable.Priority int priority) { executeRunnable(new PRunnable(priority,runnable)); } private ExecutorService createExecutor(Config config) { ThreadPoolExecutor service = new ThreadPoolExecutor(config.core, config.max, config.timeout, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(QUEUE_INIT_CAPACITY, config.fifo ? mQueueFIFOComparator : mQueueLIFOComparator), new TaskThreadFactory(name), new ThreadPoolExecutor.DiscardPolicy()); allowCoreThreadTimeOut(service, config.allowCoreTimeOut); return service; } public boolean isBusy() { synchronized (this) { if (service == null || service.isShutdown()) { return false; } if(service instanceof ThreadPoolExecutor){ ThreadPoolExecutor tService = (ThreadPoolExecutor) service; return tService.getActiveCount() >= tService.getCorePoolSize(); } return false; } } private static final void allowCoreThreadTimeOut(ThreadPoolExecutor service, boolean value) { if (Build.VERSION.SDK_INT >= 9) { allowCoreThreadTimeOut9(service, value); } } @TargetApi(9) private static final void allowCoreThreadTimeOut9(ThreadPoolExecutor service, boolean value) { service.allowCoreThreadTimeOut(value); } Comparator<Runnable> mQueueFIFOComparator = new Comparator<Runnable>() { @Override public int compare(Runnable lhs, Runnable rhs) { PRunnable r1 = (PRunnable) lhs; PRunnable r2 = (PRunnable) rhs; return PRunnable.compareFIFO(r1, r2); } }; Comparator<Runnable> mQueueLIFOComparator = new Comparator<Runnable>() { @Override public int compare(Runnable lhs, Runnable rhs) { PRunnable r1 = (PRunnable) lhs; PRunnable r2 = (PRunnable) rhs; return PRunnable.compareLIFO(r1, r2); } }; }
image.gif