Netty原始碼分析第8章(高效能工具類FastThreadLocal和Recycler)---->第6節: 異執行緒回收物件
Netty原始碼分析第八章: 高效能工具類FastThreadLocal和Recycler
第六節: 異執行緒回收物件
異執行緒回收物件, 就是建立物件和回收物件不在同一條執行緒的情況下, 物件回收的邏輯
我們之前小節簡單介紹過, 異執行緒回收物件, 是不會放在當前執行緒的stack中的, 而是放在一個WeakOrderQueue的資料結構中, 回顧我們之前的一個圖:
8-6-1
相關的邏輯, 我們跟到原始碼中:
首先從回收物件的入口方法開始, DefualtHandle的recycle方法:
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
這部分我們並不陌生, 跟到push方法中:
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (thread == currentThread) {
pushNow(item);
} else {
pushLater(item, currentThread);
}
}
上一小節分析過, 同線程會走到pushNow, 有關具體邏輯也進行了分析
如果不是同線程, 則會走到pushLater方法, 傳入handle物件和當前執行緒物件
跟到pushLater方法中:
private void pushLater(DefaultHandle<?> item, Thread thread) {
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
return;
}
queue.add(item);
}
首先通過DELAYED_RECYCLED.get()獲取一個delayedRecycled物件
我們跟到DELAYED_RECYCLED中:
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
};
這裡我們看到DELAYED_RECYCLED是一個FastThreadLocal物件, initialValue方法建立一個WeakHashMap物件, WeakHashMap是一個map, key為stack, value為我們剛才提到過的WeakOrderQueue
從中我們可以分析到, 每個執行緒都維護了一個WeakHashMap物件
WeakHashMap中的元素, 是一個stack和WeakOrderQueue的對映, 說明了不同的stack, 對應不同的WeakOrderQueue
這裡的對映關係可以舉個例子說明:
比如執行緒1建立了一個物件, 線上程3進行了回收, 執行緒2建立了一個物件, 同樣也線上程3進行了回收, 那麼執行緒3對應的WeakHashMap中就會有兩個元素:
執行緒1的stack和執行緒2的WeakOrderQueue, 執行緒2和stack和執行緒2的WeakOrderQueue
我們回到pushLater方法中:
繼續往下看:
WeakOrderQueue queue = delayedRecycled.get(this)
拿到了當前執行緒的WeakHashMap物件delayedRecycled之後, 然後通過delayedRecycled建立物件的執行緒的stack, 拿到WeakOrderQueue
這裡的this, 就是建立物件的那個執行緒所屬的stack, 這個stack是繫結在handle中的, 建立handle物件時候進行的繫結
假設當前執行緒是執行緒2, 建立handle的執行緒是執行緒1, 這裡通過handle的stack拿到執行緒1的WeakOrderQueue
if (queue == null) 說明執行緒2沒有回收過執行緒1的物件, 則進入if塊的邏輯:
首先看判斷 if (delayedRecycled.size() >= maxDelayedQueues)
delayedRecycled.size() 表示當前執行緒回收其他建立物件的執行緒的執行緒個數, 也就是有幾個其他的執行緒在當前執行緒回收物件
maxDelayedQueues表示最多能回收的執行緒個數, 這裡如果朝超過這個值, 就表示當前執行緒不能在回收其他執行緒的物件了
通過 delayedRecycled.put(this, WeakOrderQueue.DUMMY) 標記, 建立物件的執行緒的stack, 所對應的WeakOrderQueue不可用, DUMMY我們可以理解為不可用
如果沒有超過maxDelayedQueues, 則通過if判斷中的 WeakOrderQueue.allocate(this, thread) 這種方式建立一個WeakOrderQueue
allocate傳入this, 也就是建立物件的執行緒對應的stack, 假設是執行緒1, thread就是當前執行緒, 假設是執行緒2
跟到allocate方法中:
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? new WeakOrderQueue(stack, thread) : null;
}
reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)表示執行緒1的stack還能不能分配LINK_CAPACITY個元素, 如果可以, 則直接通過new的方式建立一個WeakOrderQueue物件
再跟到reserveSpace方法中:
private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}
引數availableSharedCapacity表示執行緒1的stack允許外部執行緒給其快取多少個物件, 之前我們分析過是16384, space預設是16
方法中通過一個cas操作, 將16384減去16, 表示stack可以給其他執行緒快取的物件數為16384-16
而這16個元素, 將由執行緒2快取
回到pushLater方法中:
建立之後通過 delayedRecycled.put(this, queue) 將stack和WeakOrderQueue進行關聯
最後通過queue.add(item), 將建立的WeakOrderQueue新增一個handle
講解WeakOrderQueue之前, 我們首先了解下WeakOrderQueue的資料結構
WeakOrderQueue維護了多個link, link之間是通過連結串列進行連線, 每個link可以盛放16個handle,
我們剛才分析過, 在reserveSpace方法中將 stack.availableSharedCapacity-16 , 其實就表示了先分配16個空間放在link裡, 下次回收的時候, 如果這16空間沒有填滿, 則可以繼續往裡盛放
如果16個空間都已填滿, 則通過繼續新增link的方式繼續分配16個空間用於盛放handle
WeakOrderQueue和WeakOrderQueue之間也是通過連結串列進行關聯
可以根據下圖理解上述邏輯:
8-6-2
根據以上思路, 我們跟到WeakOrderQueue的構造方法中:
private WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
owner = new WeakReference<Thread>(thread);
synchronized (stack) {
next = stack.head;
stack.head = this;
}
availableSharedCapacity = stack.availableSharedCapacity;
}
這裡有個head和tail, 都指向一個link物件, 這裡我們可以分析到, 其實在WeakOrderQueue中維護了一個連結串列, head分別代表頭結點和尾節點, 初始狀態下, 頭結點和尾節點都指向同一個節點
簡單看下link的類的定義:
private static final class Link extends AtomicInteger {
private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
private int readIndex;
private Link next;
}
每次建立一個Link, 都會建立一個DefaultHandle型別的陣列用於盛放DefaultHandle物件, 預設大小是16個
readIndex是一個讀指標, 我們之後小節會進行分析
next節點則指向下一個link
回到WeakOrderQueue的構造方法中:
owner是對向前執行緒進行一個包裝, 代表了當前執行緒
接下來在一個同步塊中, 將當前建立的WeakOrderQueue插入到stack指向的第一個WeakOrderQueue, 也就是stack的head屬性, 指向我們建立的WeakOrderQueue, 如圖所示
8-6-3
如果執行緒2建立一個和stack關聯的WeakOrderQueue, stack的head節點就就會指向執行緒2建立WeakOrderQueue
如果之後執行緒3也建立了一個和stack關聯的WeakOrderQueue, stack的head節點就會指向新建立的執行緒3的WeakOrderQueue
然後執行緒3的WeakOrderQueue再指向執行緒2的WeakOrderQueue
也就是無論哪個執行緒建立一個和同一個stack關聯的WeakOrderQueue的時候, 都插入到stack指向的WeakOrderQueue列表的頭部
這樣就可以將stack和其他執行緒釋放物件的容器WeakOrderQueue進行繫結
回到pushLater方法中:
private void pushLater(DefaultHandle<?> item, Thread thread) {
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
return;
}
queue.add(item);
}
根據之前分析的WeakOrderQueue的資料結構, 我們分析最後一步, 也就是WeakOrderQueue的add方法
我們跟進WeakOrderQueue的add方法:
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
return;
}
this.tail = tail = tail.next = new Link();
writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;
handle.stack = null;
tail.lazySet(writeIndex + 1);
}
首先, 看 handle.lastRecycledId = id
lastRecycledId表示handle上次回收的id, 而id表示WeakOrderQueue的id, weakOrderQueue每次建立的時候, 會為自增一個唯一的id
Link tail = this.tail 表示拿到當前WeakOrderQueue的中指向最後一個link的指標, 也就是尾指標
再看 if ((writeIndex = tail.get()) == LINK_CAPACITY)
tail.get()表示獲取當前link中已經填充元素的個數, 如果等於16, 說明元素已經填充滿
然後通過eserveSpace方法判斷當前WeakOrderQueue是否還能快取stack的物件, eserveSpace方法我們剛才已經分析過, 會根據stack的屬性availableSharedCapacity-16的方式判斷還能否快取stack的物件, 如果不能再快取stack的物件, 則返回
如果還能繼續快取, 則在建立一個link, 並將尾節點指向新建立的link, 並且原來尾節點的next的節點指向新建立的link
然後拿到當前link的writeIndex, 也就是寫指標, 如果是新建立的link中沒有元素, writeIndex為0
然後將尾部的link的elements屬性, 也就是一個DefaultHandle型別的陣列, 通過陣列下標的方式將第writeIndex個節點賦值為要回收的handle
然後將handle的stack屬性設定為null, 表示當前handle不是通過stack進行回收的
最後將tail節點的元素個數進行+1, 表示下一次將從writeIndex+1的位置往裡寫
以上就是異執行緒回收物件的邏輯