1. 程式人生 > >java多執行緒任務佇列模型

java多執行緒任務佇列模型

此篇文章將從任務佇列的設計;任務排程的方式(序列和並行)。程式碼很簡單,主要是設計的思想。

任務佇列 final class PendingPostQueue {     // 含有頭、尾指標的連結串列結構實現佇列     private PendingPost head;     private PendingPost tail;

    // 入佇列     synchronized void enqueue(PendingPost pendingPost) {         if (pendingPost == null) {             throw new NullPointerException("null cannot be enqueued");         }         if (tail != null) {             tail.next = pendingPost;             tail = pendingPost;         } else if (head == null) {             head = tail = pendingPost;         } else {             throw new IllegalStateException("Head present, but no tail");         }         notifyAll();     }

    // 出佇列     synchronized PendingPost poll() {         PendingPost pendingPost = head;         if (head != null) {             head = head.next;             if (head == null) {                 tail = null;             }         }         return pendingPost;     }

    // 等待最大時長; 如果此時有入佇列的操作(notifyAll),直接出佇列     synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {         if (head == null) {             wait(maxMillisToWait);         }         return poll();     } } 上面的程式碼很簡單,基本上一看就能明白;下面主要分析,這樣設計的優點:

使用頭、尾指標的連結串列結構實現佇列;入佇列通過操作尾指標,出佇列通過操作頭指標的方式達到時間複雜度都是O(1). 增加出佇列延遲的功能,方式在空佇列的時候,持續獲取或直接返回空;增加一段時間間隔等待其他執行緒的入佇列的操作(儘可能處理儘量多的任務。) 任務排程:序列執行 序列的任務排程,基本上是單執行緒模型。因為基本上是下一個任務的執行需要等到上一個任務執行完成。  程式碼如下:

// 當前任務排程類(序列) final class BackgroundPoster implements Runnable {     // 任務佇列     private final PendingPostQueue queue;     // 當前執行緒是否在正在執行     // volatile: 保證單個變數的讀寫操作是執行緒安全(通過cpu實現CAS)     private volatile boolean executorRunning;

    BackgroundPoster() {         queue = new PendingPostQueue();     }

    public void enqueue(String id, Object event) {         PendingPost pendingPost = PendingPost.obtainPendingPost(id, event); // 建立任務         synchronized (this) {             queue.enqueue(pendingPost); // 入佇列             // 如果當前沒有正在執行的任務,開啟任務             if (!executorRunning) {                 executorRunning = true;                 ThreadUtils.getExecutorService().execute(this);             }         }     }

    @Override     public void run() {         try {             try {                 while (true) {                     // 從任務佇列中獲取任務;設定一分鐘時間間隔,防止在1000分鐘內有新任務入佇列                     PendingPost pendingPost = queue.poll(1000);                     if (pendingPost == null) {                         synchronized (this) {                             // 雙層檢驗                             pendingPost = queue.poll();                             if (pendingPost == null) {                                 // 執行標誌置為false                                 executorRunning = false;                                 return; // 如果沒有任務了,將會結束此次迴圈,也就相當於停止了當前執行緒(也正因為此,上面的wait(1000)才很重要)                             }                         }                     }                     // 執行任務                     invokePost(pendingPost);                 }             } catch (InterruptedException e) {                 Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);             }         } finally {             executorRunning = false;         }     }

} 上面的程式碼也不難,對照我寫的註釋看起來會很簡單。原理也很簡單:

任務的執行是在一個子執行緒(通過執行緒池開啟的)中 任務的排程是通過操作任務佇列實現的,通過迴圈依次呼叫佇列中的任務。 wait(1000)的作用,最大化使用執行緒資源;防止佇列中剛沒有任務了就停止執行緒(具體分析在註釋中) 任務排程:並行執行 並行排程任務,就需要多執行緒排程了。  具體程式碼實現如下:

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;

    AsyncPoster() {         queue = new PendingPostQueue();     }

    public void enqueue(String id, Object event) {         PendingPost pendingPost = PendingPost.obtainPendingPost(id, event);         queue.enqueue(pendingPost);         ThreadUtils.getExecutorService().execute(this);     }

    @Override     public void run() {         PendingPost pendingPost = queue.poll();         if(pendingPost == null) {             throw new IllegalStateException("No pending post available");         }         invokePost(pendingPost);     } } 上面的程式碼更簡單,就是每一個任務開啟一個執行緒去執行。  但是如果仔細檢視程式碼會發現:  這裡根本就沒有必要使用任務佇列,直接開啟執行緒去執行任務不就行了嗎?這裡任務佇列的作用是用來傳遞資料。

任務排程:Android主執行緒排程 我們經常會遇到:回撥在主執行緒中執行。由於主執行緒只有一個,也就相當於上面的序列執行。而Android有自己的Handler訊息機制幫我們封裝好了,下面就基於這個來實現。

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;     // 主執行緒執行最大時長(防止阻塞主執行緒)     private final int maxMillisInsideHandleMessage;     // 正在執行的標誌(同序列執行)     private boolean handlerActive;     // 引數looper決定了當前任務所執行的執行緒,這裡傳遞Looper.mainLooper()就會將當前任務執行在主執行緒中     HandlerPoster(Looper looper, int maxMillisInsideHandleMessage) {         super(looper);         this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;         queue = new PendingPostQueue();     }

    void enqueue(String id, Object event) {         PendingPost pendingPost = PendingPost.obtainPendingPost(id, event);         synchronized (this) {             queue.enqueue(pendingPost);             if (!handlerActive) {                 handlerActive = true;                 // 傳送訊息                 if (!sendMessage(obtainMessage())) {                     throw new EventBusException("Could not send handler message");                 }             }         }     }

    @Override     public void handleMessage(Message msg) {         boolean rescheduled = false;         try {             long started = SystemClock.uptimeMillis();             while (true) {                 PendingPost pendingPost = queue.poll();                 if (pendingPost == null) {                     synchronized (this) {                         // Check again, this time in synchronized                         pendingPost = queue.poll();                         if (pendingPost == null) {                             handlerActive = false;                             return;                         }                     }                 }                 invokePost(pendingPost);                 long timeInMethod = SystemClock.uptimeMillis() - started;                 // 如果在主執行緒中執行的時間超過最大時間,停止當前操作,重新發送訊息;防止祖冊主執行緒                 if (timeInMethod >= maxMillisInsideHandleMessage) {                     if (!sendMessage(obtainMessage())) {                         throw new EventBusException("Could not send handler message");                     }                     //重置執行,也就是還處於執行狀態。                     rescheduled = true;                     return;                 }             }         } finally {             // 執行狀態由rescheduled決定             handlerActive = rescheduled;         }     } } 程式碼也不難,原理基本和序列排程相同;唯一不同,因為是在主執行緒中,需要對執行緒阻塞的問題進行考慮。 ---------------------  作者:qiaoba_gogo  來源:CSDN  原文:https://blog.csdn.net/u010014658/article/details/77925567