log4j非同步那些事(1)--AsyncAppender
簡介Logger和Appender的非同步化配置和基本原理
前面的部落格裡,我簡單介紹過了Log4j2的簡單配置和其中基本元件LogManager,LoggerContext以及Configuration的機制和流程。而還有兩個關鍵的元件Logger和Appender,他們是對msg做真正處理的關鍵元件,在眾多型別的Logger和Appender中,我主要想把目光集中在其中比較特定的,非同步的Logger和Appender。如果大家平時只是簡單使用Log4j,可能對非同步的Logger和Appender有些分不清,或者並未配置過這類的非同步化元件。我將用兩篇文章分別介紹非同步Appender和非同步Logger。
Logger和Appender的關係
前面的部落格中,我介紹過,記錄日誌的程式碼,這裡為了講述方便,我重新把程式碼放上來
Logger logger = LogManager.getLogger(loggerName); logger.info(msg);
這篇文章主要聚焦於非同步性的實現,所以對於獲取logger和呼叫info操作的通用流程我直接用兩張流程圖給出:


瞭解了通用流程後,我將分別介紹非同步Appender和非同步Logger的原理
非同步Appender AsyncAppender
配置
我們以官方文件的兩個配置為例來介紹非同步Appender的配置,配置檔案如下
<Configuration status="warn" name="MyApp" packages=""> <Appenders> <File name="MyFile" fileName="logs/app.log"> <PatternLayout> <Pattern>%d %p %c{1.} [%t] %m%n</Pattern> </PatternLayout> </File> <Async name="Async"> <AppenderRef ref="MyFile"/> </Async> </Appenders> <Loggers> <Root level="error"> <AppenderRef ref="Async"/> </Root> </Loggers> </Configuration>
<Configuration name="LinkedTransferQueueExample"> <Appenders> <List name="List"/> <Async name="Async" bufferSize="262144"> <AppenderRef ref="List"/> <LinkedTransferQueue/> </Async> </Appenders> <Loggers> <Root> <AppenderRef ref="Async"/> </Root> </Loggers> </Configuration>
配置中配置了兩個Appender,一個為FileAppender另一個則是非同步Appender,使用 <Async>
標籤進行宣告,在AsyncAppender中引用了FileAppender,那麼存在疑問,一般的Appender都會有一個明確的輸出位置,而對於這個非同步Appender都需要引用一個其他的Appender才將msg最終輸出。下面的介紹將會徹底解開這個疑問。
架構
下圖展示了AsyncAppender的架構

AsyncAppender的核心部件是一個阻塞佇列,logger將資料通過append方法放入到阻塞佇列中,隨後後臺執行緒從佇列中取出資料然後進行後續的操作
機制
上文簡單介紹了架構,下面從原始碼角度來詳細闡述AsyncAppender的流程
成員變數
public final class AsyncAppender extends AbstractAppender { private static final int DEFAULT_QUEUE_SIZE = 128; private static final LogEvent SHUTDOWN = new AbstractLogEvent() { }; private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1); private final BlockingQueue<LogEvent> queue; private final int queueSize; private final boolean blocking; private final long shutdownTimeout; private final Configuration config; private final AppenderRef[] appenderRefs; private final String errorRef; private final boolean includeLocation; private AppenderControl errorAppender; private AsyncThread thread; private AsyncQueueFullPolicy asyncQueueFullPolicy; }
可以看到其中的一些關鍵屬性有一個阻塞佇列queue,一個後臺執行緒thread,一個AppenderRef的陣列appenderRefs以及一個關鍵屬性blocking。對於一個任何一個Appender物件,我們都應該關注他的append()而方法,對於一個後臺執行緒,重要的方法則是run方法,對於AsyncAppender,這兩個方法剛好對應了AsyncAppender的兩個核心步驟,即放入訊息以及處理訊息,下面將分別說明。
append()方法
廢話不多說,線上程式碼:
@Override public void append(final LogEvent logEvent) { if (!isStarted()) { throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); } if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters } final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); if (!transfer(memento)) { if (blocking) { // delegate to the event router (which may discard, enqueue and block, or log in current thread) final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); route.logMessage(this, memento); } else { error("Appender " + getName() + " is unable to write primary appenders. queue is full"); logToErrorAppenderIfNecessary(false, memento); } } }
可以看到這個appende方法流程並不複雜,只有以下兩步:
1、建立LogEvent的複製物件memento 2、將event放入佇列
雖然只有簡單兩步,也需要注意一個邊界情況,那就是當阻塞佇列滿時Appender的處理,這裡我首先給出流程,然後結合程式碼進行簡要說明。

如流程圖所示,首先會判斷使用者是否設定了blocking選項,如果未選擇blocking選項,則Appender直接會將msg放入errorAppender中,如果使用者沒有配置這些Appender,則會直接丟棄這些訊息,如果設定了這個屬性,則會按照一定的策略來處理這些訊息。策略可以分為3種,他們分別為:
1、Default---等待直到佇列有空閒,退化為同步操作 2、Discard---按照日誌級別丟棄一部分日誌 3、使用者自定義(需要實現AsyncQueueFullPolicy介面)
run()方法
當使用append方法將訊息放入阻塞佇列後,後臺的執行緒將會一步的進行處理,這也就是非同步執行緒的run方法的功能所在,首先簡單看其資料結構
private class AsyncThread extends Log4jThread { private volatile boolean shutdown = false; private final List<AppenderControl> appenders; private final BlockingQueue<LogEvent> queue; }
AsyncThread這是一個內部類,其中包含一個外部Appender類的阻塞佇列,還有對應的AsyncAppender所引用的Appender。接下來我們詳細看其中的run方法
public void run() { while (!shutdown) { LogEvent event; try { event = queue.take(); if (event == SHUTDOWN) { shutdown = true; continue; } } catch (final InterruptedException ex) { break; // LOG4J2-830 } event.setEndOfBatch(queue.isEmpty()); final boolean success = callAppenders(event); if (!success && errorAppender != null) { try { errorAppender.callAppender(event); } catch (final Exception ex) { // Silently accept the error. } } } // Process any remaining items in the queue. LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", queue.size()); int count = 0; int ignored = 0; while (!queue.isEmpty()) { try { final LogEvent event = queue.take(); if (event instanceof Log4jLogEvent) { final Log4jLogEvent logEvent = (Log4jLogEvent) event; logEvent.setEndOfBatch(queue.isEmpty()); callAppenders(logEvent); count++; } else { ignored++; LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); } } catch (final InterruptedException ex) { // May have been interrupted to shut down. // Here we ignore interrupts and try to process all remaining events. } } LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); }
可以看到,非同步執行緒的邏輯比較簡單,該執行緒會一直嘗試從阻塞佇列中獲取logEvent資料,如果能夠成功獲取資料,則會呼叫AppenderRef所引用Appender的append方法,通過這個方法,我們可以看到,實際上,AsyncAppender可以看做一箇中轉站,其作用僅僅將訊息的處理非同步化,當訊息放入阻塞佇列後,info方法就能返回成功,這樣能夠大幅提高日誌記錄的吞吐,同時,使用者可以自行權衡效能與日誌收集質量上進行權衡(設定blocking選項),此外,使用者還可以設定不同型別的阻塞佇列已到達更好的日誌記錄吞吐。
再回配置
最後,讓我們整體來看AsyncAppender所支援的所有配置項以及其中每個配置項的作用
名稱 | 型別 | 描述 | 預設值 |
---|---|---|---|
AppenderRef | String | 引用的Appender | |
blocking | boolean | 是否阻塞等待(這裡指佇列滿後的處理) | true |
shutdownTimeout | integer | appender關閉時等待的超時時間 | 0(立刻關閉) |
bufferSize | integer | 阻塞佇列的最大容量 | 1024 |
errorRef | String | 佇列滿後如果不阻塞時配置的errorAppender | |
filter | Filter | 過濾器 | |
name | String | 名稱 | |
ignoreExceptions | boolean | 用於決定是否需要記錄在日誌事件處理過程中出現的異常 | true |
BlockingQueueFactory | BlockingQueueFactory | Buffer的種類(預設ArrayBlockingQueue,能夠支援DisruptorBlockingQueue,JCToolsBlockingQueue,LinkedTransferQueue) | ArrayBlockingQueueFactory |
至此,我結合原始碼簡單介紹了AsyncAppnder的使用配置以及基本原理,在下一篇文章中,我將介紹另一個非同步化元件AsyncLogger
謝謝你請我吃糖果