1. 程式人生 > >Java知識總結----佇列的使用

Java知識總結----佇列的使用

      今天跟大家來看看如何在專案中使用佇列。首先我們要知道使用佇列的目的是什麼?一般情況下,如果是一些及時訊息的處理,並且處理時間很短的情況下是不需要使用佇列的,直接阻塞式的方法呼叫就可以了。但是,如果在訊息處理的時候特別費時間,這個時候如果有新的訊息來了,就只能處於阻塞狀態,造成使用者等待。這個時候在專案中引入佇列是十分有必要的。當我們接受到訊息後,先把訊息放到佇列中,然後再用新的執行緒進行處理,這個時候就不會有訊息的阻塞了。下面就跟大家介紹兩種佇列的使用,一種是基於記憶體的,一種是基於資料庫的。

     首先,我們來看看基於記憶體的佇列。在Java的併發包中已經提供了BlockingQueue的實現,比較常用的有ArrayBlockingQueue和LinkedBlockingQueue,前者是以陣列的形式儲存,後者是以Node節點的連結串列形式儲存。至於陣列和連結串列的區別這裡就不多說了。

BlockingQueue 佇列常用的操作方法:

      1.往佇列中新增元素: add(), put(), offer()

      2.從佇列中取出或者刪除元素: remove() element()  peek()   pool()  take()

每個方法的說明如下:

      offer()方法往佇列新增元素如果佇列已滿直接返回false,佇列未滿則直接插入並返回true;

      add()方法是對offer()方法的簡單封裝.如果佇列已滿,丟擲異常new IllegalStateException("Queue full");

       put()方法往佇列裡插入元素,如果佇列已經滿,則會一直等待直到佇列為空插入新元素,或者執行緒被中斷丟擲異常.

       remove()方法直接刪除隊頭的元素:

       peek()方法直接取出隊頭的元素,並不刪除.

       element()方法對peek方法進行簡單封裝,如果隊頭元素存在則取出並不刪除,如果不存在丟擲異常NoSuchElementException()

       pool()方法取出並刪除隊頭的元素,當佇列為空,返回null;

       take()方法取出並刪除隊頭的元素,當佇列為空,則會一直等待直到佇列有新元素可以取出,或者執行緒被中斷丟擲異常

offer()方法一般跟pool()方法相對應, put()方法一般跟take()方法相對應.日常開發過程中offer()與pool()方法用的相對比較頻繁.

下面用一個例子來看看是怎麼使用的。

  1. import java.util.concurrent.BlockingQueue;  
  2. import java.util.concurrent.Executors;  
  3. import java.util.concurrent.LinkedBlockingQueue;  
  4. import java.util.concurrent.ScheduledExecutorService;  
  5. import java.util.concurrent.TimeUnit;  
  6. publicclass UserTask {  
  7.     //佇列大小
  8.     privatefinalint QUEUE_LENGTH = 10000*10;  
  9.     //基於記憶體的阻塞佇列
  10.     private BlockingQueue<String> queue = new LinkedBlockingQueue<String>(QUEUE_LENGTH);  
  11.     //建立計劃任務執行器
  12.     private ScheduledExecutorService es = Executors.newScheduledThreadPool(1);  
  13.     /** 
  14.      * 建構函式,執行execute方法 
  15.      */
  16.     public UserTask() {  
  17.         execute();  
  18.     }  
  19.     /** 
  20.      * 新增資訊至佇列中 
  21.      * @param content 
  22.      */
  23.     publicvoid addQueue(String content) {  
  24.         queue.add(content);  
  25.     }  
  26.     /** 
  27.      * 初始化執行 
  28.      */
  29.     publicvoid execute() {  
  30.         //每一分鐘執行一次
  31.         es.scheduleWithFixedDelay(new Runnable(){  
  32.             publicvoid run() {  
  33.                 try {  
  34.                     String content = queue.take();  
  35.                     //處理佇列中的資訊。。。。。
  36.                     System.out.println(content);  
  37.                 } catch (InterruptedException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         }, 01, TimeUnit.MINUTES);  
  42.     }  
  43. }  
 
        以上呢,就是基於記憶體的佇列的介紹,基於記憶體的佇列,佇列的大小依賴於JVM記憶體的大小,一般如果是記憶體佔用不大且處理相對較為及時的都可以採用此種方法。如果你在佇列處理的時候需要有失敗重試機制,那麼用此種佇列就不是特別合適了。下面就說說基於資料庫的佇列。

       基於資料庫的佇列,很好理解,就是接收到訊息之後,把訊息存入資料庫中,設定消費時間、重試次數等,再用新的執行緒從資料庫中讀取資訊,進行處理。首先來看看資料庫的設計。

欄位 型別 說明
queue_id bigint 佇列ID,唯一標識
create_time bigint 建立時間
type int 業務型別
status int 處理狀態位 : 1:有效可處理(active) 3:臨時被佔用 (locked) 5:處理完畢 標記刪除(deleted)
consume_status int 消費狀態:1:未消費  2:消費成功 3:消費失敗,等待下次消費 4:作廢
update_time bigint 更新時間
locker varchar 佔用標籤
last_consume_time bigint 最後一次消費時間
next_consume_time bigint 可消費開始時間
consume_count int 消費次數
json_data text 資料資訊 json格式


程式碼示例如下:

  1. /** 
  2.      * 批量獲取 可以消費的訊息 
  3.      * 先使用一個時間戳將被消費的訊息鎖定,然後再使用這個時間戳去查詢鎖定的資料。 
  4.      * @param count 
  5.      * @return 
  6.      */
  7.     public List<Queue> findActiveQueueNew(int count) {  
  8.         //先去更新資料
  9.         String locker = String.valueOf(System.currentTimeMillis())+random.nextInt(10000);  
  10.         int lockCount = 0;  
  11.         try {  
  12.                         //將status為1的更新為3,設定locker,先鎖定訊息
  13.             lockCount = queueDAO.updateActiveQueue(PayConstants.QUEUE_STATUS_LOCKED,  
  14.                     PayConstants.QUEUE_STATUS_ACTIVE, count, locker);  
  15.         } catch (Exception e) {  
  16.             logger.error(  
  17.                     "QueueDomainRepository.findActiveQueueNew error occured!"
  18.                             + e.getMessage(), e);  
  19.             thrownew TuanRuntimeException(  
  20.                     PayConstants.SERVICE_DATABASE_FALIURE,  
  21.                     "QueueDomainRepository.findActiveQueue error occured!", e);  
  22.         }  
  23.         //如果鎖定的數量為0,則無需再去查詢
  24.         if(lockCount == 0){  
  25.             returnnull;  
  26.         }  
  27.         //休息一會在再詢,防止資料已經被更改
  28.         try {  
  29.             Thread.sleep(1);  
  30.         } catch (Exception e) {  
  31.             logger.error("QueueDomainRepository.findActiveQueue error sleep occured!"
  32.                     + e.getMessage(), e);  
  33.         }  
  34.         List<Queue> activeList = null;  
  35.         try {  
  36.             activeList = queueDAO.getByLocker(locker);  
  37.         } catch (Exception e) {  
  38.             logger.error("QueueDomainRepository.findActiveQueue error occured!"
  39.                     + e.getMessage(), e);  
  40.             thrownew TuanRuntimeException(  
  41.                     PayConstants.SERVICE_DATABASE_FALIURE,  
  42.                     "QueueDomainRepository.findActiveQueue error occured!",e);  
  43.         }  
  44.         return activeList;  
  45.     }  

獲取到訊息之後,還需要再判斷訊息是否合法,如是否達到最大消費次數,訊息是否已被成功消費,等,判斷程式碼如下: