1. 程式人生 > >java多執行緒訊息佇列的實現

java多執行緒訊息佇列的實現

1、定義一個佇列快取池:

 //static修飾的成員變數和成員方法獨立於該類的任何物件。也就是說,它不依賴類特定的例項,被類的所有例項共享

private static List<Queue> queueCache = new LinkedList<Queue>(); 

2、定義佇列緩衝池最大訊息數,如果達到該值,那麼佇列檢入將等待檢出低於該值時繼續進行。

private Integer offerMaxQueue = 2000;

3、定義檢出執行緒,如果佇列緩衝池沒有訊息,那麼檢出執行緒會執行緒等待中

new Thread(){
public void run(){
while
(true){ String ip = null; try { synchronized (queueCache) { Integer size = queueCache.size(); if(size==0){ //佇列快取池沒有訊息,等待。。。。 queueCache.wait(); } Queue queue = queueCache.remove(0); if(isIpLock(queueStr)){//假若這個是一個多應用的分散式系統,那麼這個判斷應該是分散式鎖,這裡說的鎖不是執行緒停止,而是跳過該訊息,滯後處理 queueCache.add(queue);該queue重新加入佇列緩衝池,滯後處理, continue
; }else{ ;//這裡是處理該訊息的操作。 } size = queueCache.size(); if(size<offerMaxQueue&&size>=0){ queueCache.notifyAll();//在佇列快取池不超過最大值的前提下,假若檢入正在等待中,那麼那麼讓他們排隊檢入。 } } } catch (Exception e) { e.printStackTrace(); }finally{ try {//檢出該訊息佇列的鎖 unIpLock(queueStr); } catch (Execption e) {//捕獲異常,不能讓執行緒掛掉 e.printStackTrace(); } } } }.start();

4、檢入佇列

synchronized (queueCache) {
while(true){
Integer size = queueCache.size();
if(size>=offerMaxQueue){
try {
queueCache.wait();
continue;//繼續執行等待中的檢入任務。
} catch (InterruptedException e) {
e.printStackTrace();
}
 }//IF

if(size<=offerMaxQueue&&size>0){
queueCache.notifyAll();
}
break;//檢入完畢
}//while
}

5、鎖方法實現

/**
 * 鎖
 * @param ip
 * @return
 * @throws 
 */
public Boolean isLock(String queueStr) {
return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
}
//解鎖
public void unIpLock(String queueStr) {
if(ip!=null){
this.redisManager.del(queueStr+"_lock");
//			lock.unlock();
}
}