執行緒池之工作佇列
ArrayBlockingQueue
採用陣列來實現,並採用可重入鎖ReentrantLock來做併發控制,無論是新增還是讀取,都先要獲得鎖才能進行操作 可看出進行讀寫操作都使用了ReentrantLock,ArrayBlockingQueue需要為其指定容量
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
} public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
SynchronousQueue
由於SynchronousQueue原始碼比較複雜,裡面大量的Cas操作,SynchronousQueue沒有容器,所以裡面是裝不了任務的,當一個生產者執行緒生產一個任務的 時候,如果沒有對應的消費者消費,那麼該生產者會一直阻塞,知道有消費者消費為止。
圖示: 如下程式碼,如果我們將消費者執行緒註釋掉執行,那麼生產者哪裡將會一直阻塞
package thread.customthreadpool; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; /**
* 測試SynchronousQueue
*/
public class SynchronousQueueTest { private static final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); private static final ExecutorService service = Executors.newCachedThreadPool(); public static void main(String[] args) {
/**
* Provider
*/
service.submit(() -> {
try {
synchronousQueue.put("liu");
}catch (Exception e){
e.printStackTrace();
}
System.out.println("Consumer finished spending");
}); /**
* Consumer
*/
service.submit(() ->{
try {
synchronousQueue.take();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("take over");
});
}
}
LinkedBlockingDeque
LinkedBlockingDeque是一個雙向佇列,底層使用單鏈表實現,任何一段都可進行元素的讀寫操作,在初始化LinkedBlockingDeque的時候, 我們可以指定容量,也可不指定,如果不指定,則容量為Integer.MAX_VALUE,
注:Deque是雙端佇列,而Queue是單端佇列,雙端意思是兩端都可以進行讀寫操作,而單端則只能從一端進,一端出(FIFO)
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
package thread.customthreadpool;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingDequeTest { private static final LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>(); public static void main(String[] args) throws InterruptedException {
deque.put(1);
deque.put(2);
deque.put(3);
deque.put(4);
deque.put(5);
System.out.println(deque);
System.out.println("deque size "+deque.size());
deque.take();
deque.take();
deque.take();
deque.take();
deque.take();
System.out.println(deque);
System.out.println("deque size "+deque.size());
}
}
LinkedBlockingQueue
底層基於單向連表實現,是一個單向佇列,具有先進先出(FIFO)特點,使用了ReentrantLock來做併發控制,讀寫操作都上鎖
private final ReentrantLock putLock = new ReentrantLock();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
DelayDeque
DelayDeque是一個無界佇列,新增進DelayDeque的元素會經過compareTo方法計算,然後按照時間 進行排序,排在隊頭的元素是最早到期的,越往後到期時間越長,DelayDeque只能接受Delayed介面型別 如圖所示,佇列裡的元素並不是按照先進先出的規則,而是按照過期時間
示例
package thread.customthreadpool.delayDeque; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; public class MyDelayed implements Delayed { private final String taskName ;
private final long nowTime = System.currentTimeMillis();
private final long expireTime ; public MyDelayed(String taskName,long expireTime) {
this.taskName = taskName;
this.expireTime = expireTime;
} @Override
public long getDelay(TimeUnit unit) {
return unit.convert((nowTime+expireTime) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
} @Override
public int compareTo(Delayed o) {
MyDelayed myDelayed = (MyDelayed) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
} @Override
public String toString() {
return "MyDelayed{" +
"taskName='" + taskName + '\'' +
", nowTime=" + nowTime +
", expireTime=" + expireTime +
'}';
}
}
package thread.customthreadpool.delayDeque; import java.util.concurrent.*; public class MyDelayQueue { private static final DelayQueue<MyDelayed> delayQueue = new DelayQueue<>(); private static final ExecutorService service = Executors.newCachedThreadPool(); public static void main(String[] args) throws InterruptedException {
service.submit(() -> {
delayQueue.put(new MyDelayed("A-Task",5000));
delayQueue.put(new MyDelayed("B-Task",4000));
delayQueue.put(new MyDelayed("C-Task",3000));
delayQueue.put(new MyDelayed("D-Task",2000));
delayQueue.put(new MyDelayed("E-Task",1000));
});
while (true){
System.out.println(delayQueue.take());
}
}
}
result
應用場景
1.美團外賣訂單:當我們下單後沒付款 ,30分鐘後將自動取消訂單
2.快取,對於某些任務,需要在特定的時間清理;
and so on
LinkedTransferQueue
當消費執行緒從佇列中取元素時,如果佇列為空,那麼生成一個為null的節點,消費者執行緒就一直等待,此時如果生產者執行緒發現佇列中有一個null節點, 它就不入隊了,而是將元素填充到這個null節點並喚醒消費者執行緒,然後消費者執行緒取走元素。
LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的整合,效能比較高,因為沒有鎖操作, SynchronousQueue不能儲存元素,而LinkedTransferQueue能儲存元素,
PriorityBlockingQueue
PriorityBlockingQueue是一個無界的阻塞佇列,同時是一個支援優先順序的佇列,讀寫操作都是基於ReentrantLock, 內部使用堆演算法保證每次出隊都是優先順序最高的元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}