1. 程式人生 > >基於flume的日誌系統

基於flume的日誌系統

思路

  1. 日誌統一輸出至kafka
  2. flume agent充當kafka消費者,將日誌輸出至elasticsearch
  3. kibana負責展示日誌資訊

準備工作

  1. flume 1.8 kafka 1.1.0 elasticsearch&kibana 6.5.4
  2. 專案中一般使用log4j等日誌框架,需自定義JsonLayout
  3. flume支援的elasticsearch較低,需自定義flume es sink
  4. elasticsearch預設使用utc時間,日誌時間需保持一致

JsonLayout

只需要在自定義的JsonLayout中構造一個PatternLayout幫助format日誌訊息即可 日誌格式內容如下

public class JsonLogBean {
    private String system;
    private String ip;
    private String message;
    private String level;
    private String time;
    
    //get set 略
    public JsonLogBean(){}

    public JsonLogBean(String system, String ip,String message, String level, String time) {
        this.system = system;
        this.ip = ip;
        this.message = message;
        this.level = level;
        this.time = time;
    }

}

log4j1

public class JsonPartternLayout extends PatternLayout{

    private String system;

    //PatternLayout 預設將異常交給WriterAppender處理 這裡改為false
    public boolean ignoresThrowable() {
        return false;
    }

    private static String ip;

    private static SimpleDateFormat utcFormater;

    static {
        utcFormater = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//時區定義並進行時間獲取
        try {
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            //ignore
        }
    }


    @Override
    public String format(LoggingEvent event) {
        StringBuilder sb = new StringBuilder();
        sb.append(super.format(event));
        String[] s = event.getThrowableStrRep();
        if (s != null) {
            int len = s.length;
            for (int i = 0; i < len; i++) {
                sb.append(s[i]);
                sb.append("\n");
            }
        }
        String time = utcFormater.format(new Date(event.getTimeStamp()));
        return JSON.toJSONString(new JsonLogBean(system, ip, sb.toString(), event.getLevel().toString(), time)) + "\n";

    }

    public String getSystem() {
        return system;
    }

    public void setSystem(String system) {
        this.system = system;
    }
}

log4j2

@Plugin(name = "JsonPartternLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class JsonPartternLayout extends AbstractStringLayout {

    private PatternLayout patternLayout;

    private String system;

    private static String ip;

    private static SimpleDateFormat utcFormater;

    static {
        utcFormater = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//時區定義並進行時間獲取
        try {
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            //ignore
        }

    }

    public JsonPartternLayout(final Configuration config, final RegexReplacement replace, final String eventPattern,
             final PatternSelector patternSelector, final Charset charset, final boolean alwaysWriteExceptions,
             final boolean disableAnsi, final boolean noConsoleNoAnsi, final String headerPattern,
             final String footerPattern, final String system) {
        super(config, charset,
                newSerializerBuilder()
                        .setConfiguration(config)
                        .setReplace(replace)
                        .setPatternSelector(patternSelector)
                        .setAlwaysWriteExceptions(alwaysWriteExceptions)
                        .setDisableAnsi(disableAnsi)
                        .setNoConsoleNoAnsi(noConsoleNoAnsi)
                        .setPattern(headerPattern)
                        .build(),
                newSerializerBuilder()
                        .setConfiguration(config)
                        .setReplace(replace)
                        .setPatternSelector(patternSelector)
                        .setAlwaysWriteExceptions(alwaysWriteExceptions)
                        .setDisableAnsi(disableAnsi)
                        .setNoConsoleNoAnsi(noConsoleNoAnsi)
                        .setPattern(footerPattern)
                        .build());

        this.patternLayout = PatternLayout.newBuilder()
                .withPattern(eventPattern)
                .withPatternSelector(patternSelector)
                .withConfiguration(config)
                .withRegexReplacement(replace)
                .withCharset(charset)
                .withDisableAnsi(disableAnsi)
                .withAlwaysWriteExceptions(alwaysWriteExceptions)
                .withNoConsoleNoAnsi(noConsoleNoAnsi)
                .withHeader(headerPattern)
                .withFooter(footerPattern)
                .build();
        this.system = system;
    }

    /**
     *
     * @param event
     * @return
     */
    public String toSerializable(LogEvent event) {
        String msg = this.patternLayout.toSerializable(event);
        String time = utcFormater.format(new Date(event.getTimeMillis()));
        return JSON.toJSONString(new JsonLogBean(system, ip,msg, event.getLevel().name(), time)) + "\n";
    }


    @PluginBuilderFactory
    public static Builder newBuilder() {
        return new Builder();
    }

    /**
     *
     */
    public static class Builder implements org.apache.logging.log4j.core.util.Builder<JsonPartternLayout> {

        @PluginBuilderAttribute
        private String pattern = DEFAULT_CONVERSION_PATTERN;

        @PluginElement("PatternSelector")
        private PatternSelector patternSelector;

        @PluginConfiguration
        private Configuration configuration;

        @PluginElement("Replace")
        private RegexReplacement regexReplacement;

        // LOG4J2-783 use platform default by default
        @PluginBuilderAttribute
        private Charset charset = Charset.defaultCharset();

        @PluginBuilderAttribute
        private boolean alwaysWriteExceptions = true;

        @PluginBuilderAttribute
        private boolean disableAnsi = !useAnsiEscapeCodes();

        @PluginBuilderAttribute
        private boolean noConsoleNoAnsi;

        @PluginBuilderAttribute
        private String header;

        @PluginBuilderAttribute
        private String footer;

        @PluginBuilderAttribute
        private String system;

        private Builder() {
        }

        private boolean useAnsiEscapeCodes() {
            PropertiesUtil propertiesUtil = PropertiesUtil.getProperties();
            boolean isPlatformSupportsAnsi = !propertiesUtil.isOsWindows();
            boolean isJansiRequested = !propertiesUtil.getBooleanProperty("log4j.skipJansi", true);
            return isPlatformSupportsAnsi || isJansiRequested;
        }


        public JsonPartternLayout build() {
            // fall back to DefaultConfiguration
            if (configuration == null) {
                configuration = new DefaultConfiguration();
            }
            return new JsonPartternLayout(configuration, regexReplacement, pattern, patternSelector, charset,
                    alwaysWriteExceptions, disableAnsi, noConsoleNoAnsi, header, footer, system);
        }
    }

}

Es Sink

這裡我們直接在flume-ng-elasticsearch-sink的原始碼上做修改,為避免衝突我改了相關的包名 原始碼見這裡 pom配置如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-high-eslog-sink</artifactId>
    <version>1.8</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.rat</groupId>
                <artifactId>apache-rat-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <dependencies>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.5.4</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.5.4</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.5</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>11.0.2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>

    </dependencies>
</project>
        

flume-ng-elasticsearch-sink的原始碼還是比較少的,以下幾個比較重要的改動點

將elasticsearch lib下的jar包copy到flume lib下 以下是我copy到flume的相關jar包 如果有jar缺失或衝突檢視下flume的日誌很容易就能夠解決

[email protected] 1 zhanghuan  staff    10M  1  4 22:26 elasticsearch-6.5.4.jar
[email protected] 1 zhanghuan  staff    16K  1  4 22:26 elasticsearch-cli-6.5.4.jar
[email protected] 1 zhanghuan  staff    36K  1  4 22:26 elasticsearch-core-6.5.4.jar
[email protected] 1 zhanghuan  staff    12K  1  4 22:26 elasticsearch-launchers-6.5.4.jar
[email protected] 1 zhanghuan  staff    11K  1  4 22:26 elasticsearch-secure-sm-6.5.4.jar
[email protected] 1 zhanghuan  staff   110K  1  4 22:26 elasticsearch-x-content-6.5.4.jar
[email protected] 1 zhanghuan  staff   1.6M  1  4 22:37 lucene-analyzers-common-7.5.0.jar
[email protected] 1 zhanghuan  staff    98K  1  4 22:37 lucene-backward-codecs-7.5.0.jar
[email protected] 1 zhanghuan  staff   2.9M  1  4 22:37 lucene-core-7.5.0.jar
[email protected] 1 zhanghuan  staff    85K  1  4 22:37 lucene-grouping-7.5.0.jar
[email protected] 1 zhanghuan  staff   202K  1  4 22:37 lucene-highlighter-7.5.0.jar
[email protected] 1 zhanghuan  staff   143K  1  4 22:37 lucene-join-7.5.0.jar
[email protected] 1 zhanghuan  staff    50K  1  4 22:37 lucene-memory-7.5.0.jar
[email protected] 1 zhanghuan  staff    93K  1  4 22:37 lucene-misc-7.5.0.jar
[email protected] 1 zhanghuan  staff   259K  1  4 22:37 lucene-queries-7.5.0.jar
[email protected] 1 zhanghuan  staff   373K  1  4 22:37 lucene-queryparser-7.5.0.jar
[email protected] 1 zhanghuan  staff   259K  1  4 22:37 lucene-sandbox-7.5.0.jar
[email protected] 1 zhanghuan  staff    14K  1  4 22:37 lucene-spatial-7.5.0.jar
[email protected] 1 zhanghuan  staff   231K  1  4 22:37 lucene-spatial-extras-7.5.0.jar
[email protected] 1 zhanghuan  staff   295K  1  4 22:37 lucene-spatial3d-7.5.0.jar
[email protected] 1 zhanghuan  staff   240K  1  4 22:37 lucene-suggest-7.5.0.jar
-rw-r--r--  1 zhanghuan  staff   7.4K  1  7 23:04 transport-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    77K  1  7 23:08 transport-netty4-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff   107K  1  7 23:10 reindex-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    72K  1  7 23:11 percolator-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    60K  1  7 23:12 lang-mustache-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    75K  1  7 23:13 parent-join-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff   1.1M  1  7 23:16 hppc-0.7.1.jar
-rw-r--r--  1 zhanghuan  staff   258K  1  7 23:18 log4j-api-2.11.1.jar
-rw-r--r--  1 zhanghuan  staff   1.5M  1  7 23:19 log4j-core-2.11.1.jar
-rw-r--r--  1 zhanghuan  staff    50K  1  8 19:29 t-digest-3.2.jar
[email protected] 1 zhanghuan  staff   3.6M  1  8 19:38 netty-all-4.1.25.Final.jar
[email protected] 1 zhanghuan  staff   276K  1  8 19:42 jackson-core-2.8.11.jar
[email protected] 1 zhanghuan  staff    50K  1  8 19:42 jackson-dataformat-cbor-2.8.11.jar
[email protected] 1 zhanghuan  staff    72K  1  8 19:42 jackson-dataformat-smile-2.8.11.jar
[email protected] 1 zhanghuan  staff    40K  1  8 19:42 jackson-dataformat-yaml-2.8.11.jar
-rw-r--r--  1 zhanghuan  staff    28K  1 10 21:33 flume-ng-high-eslog-sink-1.8.jar
[email protected] 1 zhanghuan  staff    62K  1 12 19:35 log4j-1.2-api-2.11.1.jar

測試

這裡我們使用log4j2 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="com.mine.log">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}[%t] %-5p %c %msg%xEx%n" />
        </Console>

        <Kafka name="Kafka" topic="test1">
            <JsonPartternLayout system = "mlog" pattern = "[%t] %c %msg%xEx%n"/>
            <Property name="bootstrap.servers">localhost:9092</Property>
        </Kafka>
    </Appenders>

    <Loggers>
        <Root level="debug">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="Kafka"/>
        </Root>
    </Loggers>
</Configuration>

測試程式碼

public class LogTest {

    private static final Logger logger = LogManager.getLogger(LogTest.class);

    @org.junit.Test
    public void test() {
        logger.info("輸出資訊……");
        logger.trace("隨意列印……");
        logger.debug("除錯資訊……");
        logger.warn( "警告資訊……");
        try {
            new Thread(new Runnable() {
                public void run() {
                    logger.warn("test……");
                }
            }).start();
            LogTest.class.getClass().forName("123");
        } catch (Exception e) {
            logger.error("處理業務邏輯的時候發生一個錯誤……", e);
        }
    }
}

flume配置

#Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

#Describe/configure the source
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.channels = channel1
agent.sources.r1.batchSize = 5000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = localhost:9092
agent.sources.r1.kafka.topics = test1
agent.sources.r1.kafka.consumer.group.id = custom.g.id

#Describe the sink
agent.sinks.k1.type = org.apache.flume.sink.hielasticsearch.ElasticSearchSink
agent.sinks.k1.hostNames = 127.0.0.1:9300
agent.sinks.k1.indexName = log_index
agent.sinks.k1.indexType = log_table
#agent.sinks.k1.clusterName = log_cluster
agent.sinks.k1.batchSize = 500
agent.sinks.k1.ttl = 5d
agent.sinks.k1.serializer = org.apache.flume.sink.hielasticsearch.ElasticSearchDynamicSerializer

#Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

啟動kafka flume elasticsearch kibana即可 配置相關基本下載即用我就略過了哈 日誌展示如下