log4j2傳送訊息至Kafka
title: 自定義log4j2傳送日誌到Kafka
tags: log4j2,kafka
為了給公司的大資料平臺提供各專案組的日誌,而又使各專案組在改動上無感知。做了一番調研後才發現log4j2預設有支援將日誌傳送到kafka的功能,驚喜之下趕緊看了下log4j對其的實現原始碼!發現預設的實現是同步阻塞的,如果kafka服務一旦掛掉會阻塞正常服務的日誌列印,為此本人在參考原始碼的基礎上做了一些修改。
log4j日誌工作流程
log4j2對於log4j在效能上有著顯著的提升,這點官方上已經有了明確的說明和測試,所以不多贅述。在為了更熟練的使用,還是有必要了解其內部的工作流程。這是 ofollow,noindex" target="_blank">官網 log4j的一張類圖
Applications using the Log4j 2 API will request a Logger with a specific name from the LogManager. The LogManager will locate the appropriate LoggerContext and then obtain the Logger from it. If the Logger must be created it will be associated with the LoggerConfig that contains either a) the same name as the Logger, b) the name of a parent package, or c) the root LoggerConfig. LoggerConfig objects are created from Logger declarations in the configuration. The LoggerConfig is associated with the Appenders that actually deliver the LogEvents.
官網已經解釋他們之間的關係了,這裡不再對每個類的功能和作用做具體介紹,今天的重點是 Appender
類,因為他將決定將日誌輸出至何方。
- Appender
The ability to selectively enable or disable logging requests based on their logger is only part of the picture. Log4j allows logging requests to print to multiple destinations. In log4j speak, an output destination is called an Appender. Currently, appenders exist for the console, files, remote socket servers, Apache Flume, JMS, remote UNIX Syslog daemons, and various database APIs. See the section on Appenders for more details on the various types available. More than one Appender can be attached to a Logger.
核心配置
上圖是log4j2傳送日誌到kafka的核心類,其實最主要的 KafkaAppender
,其他的幾個類是連線 kafka
服務的。
- KafkaAppender核心配置
@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true) public final class KafkaAppender extends AbstractAppender { /** * */ private static final long serialVersionUID = 1L; @PluginFactory public static KafkaAppender createAppender( @PluginElement("Layout") final Layout<? extends Serializable> layout, @PluginElement("Filter") final Filter filter, @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name, @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions, @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic, @PluginElement("Properties") final Property[] properties) { final KafkaManager kafkaManager = new KafkaManager(name, topic, properties); return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager); } private final KafkaManager manager; private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) { super(name, filter, layout, ignoreExceptions); this.manager = manager; } @Override public void append(final LogEvent event) { if (event.getLoggerName().startsWith("org.apache.kafka")) { LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); } else { try { if (getLayout() != null) { manager.send(getLayout().toByteArray(event)); } else { manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8)); } } catch (final Exception e) { LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); } } } @Override public void start() { super.start(); manager.startup(); } @Override public void stop() { super.stop(); manager.release(); }
- log4j2.xml簡單配置
<?xml version="1.0" encoding="UTF-8"?> ... <Appenders> <Kafka name="Kafka" topic="log-test"> <PatternLayout pattern="%date %message"/> <Property name="bootstrap.servers">localhost:9092</Property> </Kafka> </Appenders> <Loggers> <Root level="DEBUG"> <AppenderRef ref="Kafka"/> </Root> <Logger name="org.apache.kafka" level="INFO" /> <!-- avoid recursive logging --> </Loggers>
其中 @Plugin
的name屬性對應的xml配置檔案裡面Kafka標籤,當然這個也可以自定義。與此同時,也需要將 @Plugin
的name屬性改為MyKafka。如下配置:
<MyKafka name="Kafka" topic="log-test">
自定義配置
有時候我們會用到的屬性由於預設的 KafkaAppender
不一定支援,所以需要一定程度的改寫。但是改寫也比較方便,只需要從構造器的 Properties kafkaProps
屬性中取值即可。為了滿足專案要求,我這邊定義了platform和serviceName兩個屬性。
通過 KafkaAppender
的原始碼可知,他傳送訊息採取的是同步阻塞的方式。經過測試,一旦kafka服務掛掉,那麼將會影響專案服務正常的日誌輸出,而這不是我希望看到的,所以我對他做了一定的程度的修改。
feature::
-
kafka服務一直正常
這種情況屬於最理想的情況,訊息將源源不斷的傳送至kafka broker
-
kafka服務掛掉,過一段時間後恢復正常
當kafka服務在掛掉的那一刻,後續所有的訊息將會輸出至
ConcurrentLinkedQueue
佇列裡面去。同時該佇列的訊息也會不斷的被消費,輸出至本地檔案。當心跳檢測到kafka broker恢復正常了,本地檔案的內容將會被讀取,然後傳送至kafka broker。需要注意的時候,此時會有大量訊息被例項化為ProducerRecord
物件,堆記憶體的佔用率非常高,所以我用執行緒阻塞了一下! - kafka服務一直掛
所有的訊息都會被輸出至本地檔案。
原始碼點我