一、業務背景

目前移動端的使用場景中會用到大量的訊息推送,push訊息可以幫助運營人員更高效地實現運營目標(比如給使用者推送營銷活動或者提醒APP新功能)。

對於推送系統來說需要具備以下兩個特性:

  • 訊息秒級送到使用者,無延時,支援每秒百萬推送,單機百萬長連線。

  • 支援通知、文字、自定義訊息透傳等展現形式。正是由於以上原因,對於系統的開發和維護帶來了挑戰。下圖是推送系統的簡單描述(API->推送模組->手機)。

二、問題背景

推送系統中長連線叢集在穩定性測試、壓力測試階執行一段時間後隨機會出現一個程序掛掉的情況,概率較小(頻率為一個月左右發生一次),這會影響部分客戶端訊息送到的時效。

推送系統中的長連線節點(Broker系統)是基於Netty開發,此節點維護了服務端和手機終端的長連線,線上問題出現後,新增Netty記憶體洩露監控引數進行問題排查,觀察多天但並未排查出問題。

由於長連線節點是Netty開發,為便於讀者理解,下面簡單介紹一下Netty。

三、 Netty介紹

Netty是一個高效能、非同步事件驅動的NIO框架,基於Java NIO提供的API實現。它提供了對TCP、UDP和檔案傳輸的支援,作為當前最流行的NIO框架,Netty在網際網路領域、大資料分散式計算領域、遊戲行業、通訊行業等獲得了廣泛的應用,HBase,Hadoop,Bees,Dubbo等開源元件也基於Netty的NIO框架構建。

四、問題分析

4.1 猜想

最初猜想是長連線數導致的,但經過排查日誌、分析程式碼,發現並不是此原因造成。

長連線數:39萬,如下圖:

每個channel位元組大小1456, 按40萬長連線計算,不致於產生記憶體過大現象。

4.2 檢視GC日誌

檢視GC日誌,發現程序掛掉之前頻繁full GC(頻率5分鐘一次),但記憶體並未降低,懷疑堆外記憶體洩露。

4.3 分析heap記憶體情況

ChannelOutboundBuffer物件佔將近5G記憶體,洩露原因基本可以確定:ChannelOutboundBuffer的entry數過多導致,檢視ChannelOutboundBuffer的原始碼可以分析出,是ChannelOutboundBuffer中的資料。

沒有寫出去,導致一直積壓;ChannelOutboundBuffer內部是一個連結串列結構。

4.4 從上圖分析資料未寫出去,為什麼會出現這種情況?

程式碼中實際有判斷連線是否可用的情況(Channel.isActive),並且會對超時的連線進行關閉。從歷史經驗來看,這種情況發生在連線半開啟(客戶端異常關閉)的情況比較多---雙方不進行資料通訊無問題。

按上述猜想,測試環境進行重現和測試。

1)模擬客戶端叢集,並與長連線伺服器建立連線,設定客戶端節點的防火牆,模擬伺服器與客戶端網路異常的場景(即要模擬Channel.isActive呼叫成功,但資料實際傳送不出去的情況)。

2)調小堆外記憶體,持續傳送測試訊息給之前的客戶端。訊息大小(1K左右)。

3)按照128M記憶體來計算,實際上呼叫9W多次就會出現。

五、問題解決

5.1 啟用autoRead機制

當channel不可寫時,關閉autoRead;

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!ctx.channel().isWritable()) {
Channel channel = ctx.channel();
ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
String clientId = "";
if (channelInfo != null) {
clientId = channelInfo.getClientId();
} LOGGER.info("channel is unwritable, turn off autoread, clientId:{}", clientId);
channel.config().setAutoRead(false);
}
}

當資料可寫時開啟autoRead;

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
{
Channel channel = ctx.channel();
ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
String clientId = "";
if (channelInfo != null) {
clientId = channelInfo.getClientId();
}
if (channel.isWritable()) {
LOGGER.info("channel is writable again, turn on autoread, clientId:{}", clientId);
channel.config().setAutoRead(true);
}
}

說明:

autoRead的作用是更精確的速率控制,如果開啟的時候Netty就會幫我們註冊讀事件。當註冊了讀事件後,如果網路可讀,則Netty就會從channel讀取資料。那如果autoread關掉後,則Netty會不註冊讀事件。

這樣即使是對端傳送資料過來了也不會觸發讀事件,從而也不會從channel讀取到資料。當recv_buffer滿時,也就不會再接收資料。

5.2 設定高低水位

serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 8 * 1024 * 1024));

注:高低水位配合後面的isWritable使用

5.3 增加channel.isWritable()的判斷

channel是否可用除了校驗channel.isActive()還需要加上channel.isWrite()的判斷,isActive只是保證連線是否啟用,而是否可寫由isWrite來決定。

private void writeBackMessage(ChannelHandlerContext ctx, MqttMessage message) {
Channel channel = ctx.channel();
//增加channel.isWritable()的判斷
if (channel.isActive() && channel.isWritable()) {
ChannelFuture cf = channel.writeAndFlush(message);
if (cf.isDone() && cf.cause() != null) {
LOGGER.error("channelWrite error!", cf.cause());
ctx.close();
}
}
}

注:isWritable可以來控制ChannelOutboundBuffer,不讓其無限制膨脹。其機制就是利用設定好的channel高低水位來進行判斷。

5.4 問題驗證

修改後再進行測試,傳送到27W次也並不報錯;

六、解決思路分析

一般Netty資料處理流程如下:將讀取的資料交由業務執行緒處理,處理完成再發送出去(整個過程是非同步的),Netty為了提高網路的吞吐量,在業務層與socket之間增加了一個ChannelOutboundBuffer。

在呼叫channel.write的時候,所有寫出的資料其實並沒有寫到socket,而是先寫到ChannelOutboundBuffer。當呼叫channel.flush的時候才真正的向socket寫出。因為這中間有一個buffer,就存在速率匹配了,而且這個buffer還是無界的(連結串列),也就是你如果沒有控制channel.write的速度,會有大量的資料在這個buffer裡堆積,如果又碰到socket寫不出資料的時候(isActive此時判斷無效)或者寫得慢的情況。

很有可能的結果就是資源耗盡,而且如果ChannelOutboundBuffer存放的是DirectByteBuffer,這會讓問題更加難排查。

流程可抽象如下:

從上面的分析可以看出,步驟一寫太快(快到處理不過來)或者下游傳送不出資料都會造成問題,這實際是一個速率匹配問題。

七、Netty原始碼說明

超過高水位

當ChannelOutboundBuffer的容量超過高水位設定閾值後,isWritable()返回false,設定channel不可寫(setUnwritable),並且觸發fireChannelWritabilityChanged()。

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
} long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

低於低水位

當ChannelOutboundBuffer的容量低於低水位設定閾值後,isWritable()返回true,設定channel可寫,並且觸發fireChannelWritabilityChanged()。

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
} long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

八、總結

當ChannelOutboundBuffer的容量超過高水位設定閾值後,isWritable()返回false,表明訊息產生堆積,需要降低寫入速度。

當ChannelOutboundBuffer的容量低於低水位設定閾值後,isWritable()返回true,表明訊息過少,需要提高寫入速度。通過以上三個步驟修改後,部署線上觀察半年未發生問題出現。

​作者:vivo網際網路伺服器團隊-Zhang Lin