1. 程式人生 > >Logstash5.2.x升級至6.5.x

Logstash5.2.x升級至6.5.x

第1章 環境說明:

現有架構為elk+kafka+filebeatelk各元件為5.2.x版本

[[email protected] ~]# rpm -qa |grep logstash

logstash-5.2.2-1.noarch

[[email protected] ~]# java -version

java version "1.8.0_181"

Java(TM) SE Runtime Environment (build 1.8.0_181-b13)

Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

由於logstash5.x版本不支援獨立的pipeline,需要大量的if-else判斷,配置檔案管理起來也比較複雜,而新的pipeline配置相對獨立,可以針對每個業務的日誌型別來進行管理

這裡只升級了logstash元件,驗證是可以和es 5.x版本配置使用的,沒有問題

第2章 配置檔案除錯(這裡以nginx日誌為例)

2.1主配置檔案:

cat /etc/logstash/pipelines.yml

- pipeline.id: nginx_access

 path.config: "/etc/logstash/conf.d/nginx_access.yml"

2.2管道檔案

cat /etc/logstash/conf.d/nginx_access.yml

input {

  kafka {

     bootstrap_servers => "127.0.0.1:9020"

     group_id          => "logstash"

     consumer_threads  => 5

     topics            => "nginx_access"

     codec             => "json"

   }

}

filter {

   grok {

     patterns_dir   => [ "/etc/logstash/patterns.d/" ]

     match          => {

        message => ["%{WPT_NGX_COMM}"]

      }

   }

   mutate {

     split          => ["request" , "?"]

     add_field      => {

        "uri_path"  => "%{[request][0]}"

        "uri_query" => "%{[request][1]}"

      }

      remove_field  => ["request"]

     convert        => {

         "response"              => "integer"

         "body_bytes_sent"       => "integer"

        "request_time"          => "float"

        "upstream_response_time" => "float"

      }  

   }

   useragent {

     source         => "user_agent"

     lru_cache_size => 5000

   }

   date {

     timezone     => "Asia/Shanghai"

     match        => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss" ]

      target      => "@timestamp"

     remove_field => "timestamp"

   }

}

output {

  elasticsearch {

      hosts => ["http://127.0.0.1:9200"]

      index => "%{type}-%{+YYYY.MM.dd}"

   }

}

除錯期間可以使用./bin/logstash –r 來檢查配置

第3章 logstash升級至6.5.4

3.1停止服務:

systemctl stop logstash.service

3.2解除安裝舊版本logstash

rpm –e logstash-5.2.2-1.noarch

3.3安裝新版logstash

yum localinstall logstash-6.5.4.rpm

配置檔案我本地打包好了上傳到伺服器上

pipeline檔案中,我只開啟了一個管道進行驗證服務是否有問題,並且在output中,同時讓資料打到檔案一份,用來驗證資料解析是否正常,最後啟動logstash服務即可

output {

  elasticsearch {

      hosts => ["http://127.0.0.1:9200"]

      index => "%{type}-%{+YYYY.MM.dd}"

   }

   file {

      path => "/tmp/test.log"

   }

日誌中沒有報錯,並且logstash工作也是正常的,但是有警告,網上查了一下,可能是與kafka版本不匹配而導致

[2018-12-26T14:39:05,424][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-7

       at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_121]

       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_121]

       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_121]

       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_121]

       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_121]

       at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_121]

       at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) [kafka-clients-2.0.1.jar:?]

       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:?]

       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650) [kafka-clients-2.0.1.jar:?]

       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) [kafka-clients-2.0.1.jar:?]

       at sun.reflect.GeneratedConstructorAccessor47.newInstance(Unknown Source) [?:1.8.0_121]

       at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [?:1.8.0_121]

       at java.lang.reflect.Constructor.newInstance(Constructor.java:423) [?:1.8.0_121]