1. 程式人生 > >logstash常用配置檔案總結

logstash常用配置檔案總結

目錄

logstash配置檔案示例

輸入input

filebeat

標準輸入(螢幕)

讀取檔案(File)

讀取網路資料(TCP)

讀取mysql

處理filter

基礎知識

if使用

Grok 正則捕獲

時間處理(Date)

資料修改(Mutate)(重要)

變數增減

型別轉換

字串處理

gsub(替換)

split(分割)

join(合併)

merge(合併陣列)

rename

update

replace

執行次序

輸出output

mysql寫入

標準輸出(Stdout)

儲存成檔案(File)


logstash配置檔案示例
 

input {
    beats {
        port => "5011"
    }
}
filter {
        
        if  "include" in [tags]  {
            mutate{
                split => ["message","123"]
                add_field => {
                    "beforeinclude" => "%{[message][0]}"
                }
            }
            mutate{
                gsub => ["beforeinclude", ",", ""]
            }
            mutate{
                add_field => {
                    "StandardizedMessage" => "%{beforeinclude}123%{[message][1]}"
                }
            }
        }
        
        if  "include" in [tags] or "exclude" in [tags]{
            #分割“
            mutate{
                add_field => {
                    "StandardizedMessage2" => "%{StandardizedMessage}"
                }                
            }
            mutate{
                #貌似不能用雙引號分割
                gsub => ["StandardizedMessage2", "\"", "||"]
            }
            mutate{
                split => ["StandardizedMessage2","||"]    
                add_field => {
                    "PAGE" => "%{[StandardizedMessage2][3]}"
                }
                                    
            }    
            mutate{
                split => ["StandardizedMessage"," "]
                
                add_field => {
                    "URL" => "%{[StandardizedMessage][7]}"
                }                
                add_field => {
                    "INFO" => "%{[StandardizedMessage][0]}"
                }                            
            }
            mutate{
                gsub => ["INFO", "\"", ""]
            }                                
            truncate {
                fields => [ "URL" ]
                length_bytes => 1000
            }
            grok {
                match => { "message" => "%{HTTPDATE:requesttimestamp}" }
            }
            mutate{
                add_field => {
                    "tem1" => "%{requesttimestamp}"
                }
                remove_field => [ "requesttimestamp" ]
            }
            date{
                match=>["tem1","dd/MMM/yyyy:HH:mm:ss Z"]
                target=>"logdatetime"
            }
            mutate{
                remove_field => [ "tem1" ]
            }
            ruby{
                code => "event.set('tem2', (event.get('logdatetime').time.localtime + 8*60*60))"
            }
            mutate{
                add_field => {
                    "tmp3" => "%{tem2}"
                }
                remove_field => [ "tem2" ]
            }
            mutate{
                split => ["tmp3", "."]
                add_field => {
                    "ACCESS_TIME" => "%{[tmp3][0]}"
                }
                remove_field => [ "tmp3" ]
            }
            mutate{
                gsub => ["ACCESS_TIME", "T", " "]
            }
        }
        
}
output{
    
    if "exclude" in [tags] or "include" in [tags] {
        jdbc {
           driver_jar_path => "/home/elk/jar/jdbc/mysql-connector-java-5.1.46-bin.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://192.168.2.123:3306/cm?user=123&password=123"
          statement => [ "INSERT INTO buff  (URL,ACCESS_TIME) VALUES(?, ?)", "URL","ACCESS_TIME"]
        }
    }
}

可以參考Logstash最佳實踐
http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html

logstash的配置檔案總共分為三個部分 輸入input  處理filter  輸出output


輸入input

filebeat


從filebeat中傳輸過來資料(埠為filebeat中配置的埠)

input {
    beats {
        port => "5011"
    }
}

標準輸入(螢幕)
 

input{stdin{}}

讀取檔案(File)
 

input
    file {
        path => ["/var/log/*.log", "/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}

有一些比較有用的配置項,可以用來指定 FileWatch 庫的行為:
discover_interval
logstash 每隔多久去檢查一次被監聽的 path 下是否有新檔案。預設值是 15 秒。
exclude
不想被監聽的檔案可以排除出去,這裡跟 path 一樣支援 glob 展開。
sincedb_path
如果你不想用預設的 $HOME/.sincedb(Windows 平臺上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通過這個配置定義 sincedb 檔案到其他位置。
sincedb_write_interval
logstash 每隔多久寫一次 sincedb 檔案,預設是 15 秒。
stat_interval
logstash 每隔多久檢查一次被監聽檔案狀態(是否有更新),預設是 1 秒。
start_position
logstash 從什麼位置開始讀取檔案資料,預設是結束位置,也就是說 logstash 程序會以類似 tail -F 的形式執行。如果你是要匯入原有資料,把這個設定改成 "beginning",logstash 程序就從頭開始讀取,有點類似 cat,但是讀到最後一行不會終止,而是繼續變成 tail -F。


讀取網路資料(TCP)
 

input {
    tcp {
        port => 8888
        mode => "server"
        ssl_enable => false
    }
}

讀取mysql
 

input {
 stdin { }
    jdbc {
        jdbc_connection_string => "jdbc:mysql://localhost:3306/database"
 
        jdbc_user => "root"
 
        jdbc_password => "root"
 
        jdbc_driver_library => "C:/Program Files (x86)/MySQL/Connector.J 5.1/mysql-connector-java-5.1.40-bin.jar"
 
        jdbc_driver_class => "com.mysql.jdbc.Driver"
         
        jdbc_paging_enabled => "true"
         
        jdbc_page_size => "50000"<br>
        statement => "SELECT * FROM session"<br>
        schedule => "* * * * *"
    }
 }

 

處理filter

基礎知識

變數message為logstash 從input得到的一整條字串。
可以用"%{AAA}"代表變數名AAA這個變數的值
"%{[AAA][3]}"代表變數名AAA這個陣列變數(通常用split分割後)的第四個變數的值(陣列從0開始)
filter 區段之內,是順序執行的。

if使用

第一種用法是if 標籤
如果這一次進入的流是job-exclude標籤或job-include標籤,則執行大括號內的操作
if  "job-exclude" in [tags] or "job-include" in [tags]{
    mutate{
        split => ["message","123"]
        add_field => {
            "beforeexclude" => "%{[message][0]}"
        }
    }
    
}
第二種用法是變數的表示式
 if "123" in [Name] {
        mutate { remove_field => [ "Name" ] }
 }
 如果Name變數中有123這個字串,則。。。

 

Grok 正則捕獲


在grok中,支援以正則表示式的方式提取所需要的資訊,其中,正則表示式又分兩種,一種是內建的正則表示式(可以滿足我們大部分的要求),一種是自定義的正則表示式,形式分別如下:
# 內建的正則表示式方式,下面的寫法表示從輸入的訊息中提取IP欄位,並命名為sip
%{IP:sip}
# 自定義的正則表示式,開始與終止符分別為(?與?),下面的寫法表示獲取除,以外的字元,並命名為log_type
(?<log_type>[^,]+?)

以一個具體的例子來說明grok用法,假定需要解析的訊息內容樣本如下:
日誌型別:僵屍網路日誌, 源IP:192.168.101.251, 源埠:63726, 目的IP:124.127.48.41, 目的埠:1390, 攻擊型別:異常流量, 嚴重級別:低, 系統動作:被記錄, URL:-\n
為了快速判斷我們的正則表示式是否正確,不妨進行線上測試,地址為(線上測試地址)[http://grokdebug.herokuapp.com/],最後的結果為:
^日誌型別:(?<log_type>[^,]+?), 源IP:%{IP:sip}, 源埠:%{NUMBER:sport:int}, 目的IP:%{IP:dip}, 目的埠:%{NUMBER:dport:int}, 攻擊型別:(?<att_type>[^,]+?), 嚴重級別:(?<slevel>[^,]{1,}?), 系統動作:(?<sys_act>[^,]{1,}?), URL:(?<url>.+)$
在上面的表示式中,為了提高正則表示式的解析效率,我們需要進行整行匹配,於是添加了正則表示式的開始與結尾字元“^$”,此外為了便於統計與分析,我們還需要對相關型別進行轉換(如將字串轉換為整數),例如我們所需要的埠為整數,表示式為%{NUMBER:dport:int}。需要注意的是,grok也就支援兩種資料型別轉換,分別為float與int。

例子

grok {
    match => { "message" => "%{HTTPDATE:requesttimestamp}" }
}


將message中匹配httpdate格式的東西放入變數requesttimestamp

         

時間處理(Date)

filter {
    grok {
        match => ["message", "%{HTTPDATE:logdate}"]
    }
    date {
        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}


注意:時區偏移量只需要用一個字母 Z 即可。
將變數匹配為相應格式
具體格式:
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html


資料修改(Mutate)(重要)

filters/mutate 外掛是 Logstash 另一個重要外掛。它提供了豐富的基礎型別資料處理能力。

變數增減

mutate{
    add_field => {
        "StandardizedMessage2" => "%{StandardizedMessage}"
    }                
}


新建變數名為StandardizedMessage2,它的值為名字為StandardizedMessage這個變數的值

mutate{
    remove_field => [ "tem1" ]
}


刪除tem1這個變數

型別轉換

型別轉換是 filters/mutate 外掛最初誕生時的唯一功能。
可以設定的轉換型別包括:"integer","float" 和 "string"。示例如下:

filter {
    mutate {
        convert => ["request_time", "float"]
    }
}


注意:mutate 除了轉換簡單的字元值,還支援對陣列型別的欄位進行轉換,即將 ["1","2"] 轉換成 [1,2]。但不支援對雜湊型別的欄位做類似處理。有這方面需求的可以採用稍後講述的 filters/ruby 外掛完成。


字串處理

gsub(替換)

僅對字串型別欄位有效
例如

gsub => ["beforeinclude", ",", "|"]
將beforeinclude這個變數的逗號換為|

gsub => ["urlparams", "[\\?#]", "_"]


split(分割)

filter {
    mutate {
        split => ["message", "|"]
    }
}


隨意輸入一串以|分割的字元,比如 "123|321|adfd|dfjld*=123",可以看到如下輸出:
{
    "message" => [
        [0] "123",
        [1] "321",
        [2] "adfd",
        [3] "dfjld*=123"
    ],
    "@version" => "1",
    "@timestamp" => "2014-08-20T15:58:23.120Z",
    "host" => "raochenlindeMacBook-Air.local"
}
注意split分割是永久性的,如果要用兩種東西分割,請先用變數A=這個變數B,再分割變數a和b
具體使用如
 

mutate{
    split => ["message","123"]
    add_field => {
        "before" => "%{[message][0]}"
    }
}

join(合併)

僅對陣列型別欄位有效
我們在之前已經用 split 割切的基礎再 join 回去。配置改成:

filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        join => ["message", ","]
    }
}


12|23 改為12,34

merge(合併陣列)

合併兩個陣列或者雜湊欄位。把後面的變數合併到前面的那個
依然在之前 split 的基礎上繼續:

filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        merge => ["message", "message"]
    }
}


把後面的變數合併到前面的那個

我們會看到輸出:

{
       "message" => [
        [0] "123",
        [1] "321",
        [2] "adfd",
        [3] "dfjld*=123",
        [4] "123",
        [5] "321",
        [6] "adfd",
        [7] "dfjld*=123"
    ],
      "@version" => "1",
    "@timestamp" => "2014-08-20T16:05:53.711Z",
          "host" => "raochenlindeMacBook-Air.local"
}
如果 src 欄位是字串,會自動先轉換成一個單元素的陣列再合併。把上一示例中的來源欄位改成 "host":

filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        merge => ["message", "host"]
    }
}


結果變成:

{
       "message" => [
        [0] "123",
        [1] "321",
        [2] "adfd",
        [3] "dfjld*=123",
        [4] "raochenlindeMacBook-Air.local"
    ],
      "@version" => "1",
    "@timestamp" => "2014-08-20T16:07:53.533Z",
          "host" => [
        [0] "raochenlindeMacBook-Air.local"
    ]
}
看,目的欄位 "message" 確實多了一個元素,但是來源欄位 "host" 本身也由字串型別變成陣列型別了!

rename

重新命名某個欄位,如果目的欄位已經存在,會被覆蓋掉:

filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}


update


更新某個欄位的內容。如果欄位不存在,不會新建。

replace


作用和 update 類似,但是當欄位不存在的時候,它會起到 add_field 引數一樣的效果,自動新增新的欄位。

執行次序

需要注意的是,filter/mutate 內部是有執行次序的。其次序如下:

    rename(event) if @rename
    update(event) if @update
    replace(event) if @replace
    convert(event) if @convert
    gsub(event) if @gsub
    uppercase(event) if @uppercase
    lowercase(event) if @lowercase
    strip(event) if @strip
    remove(event) if @remove
    split(event) if @split
    join(event) if @join
    merge(event) if @merge

    filter_matched(event)
而 filter_matched 這個 filters/base.rb 裡繼承的方法也是有次序的。

  @add_field.each do |field, value|
  end
  @remove_field.each do |field|
  end
  @add_tag.each do |tag|
  end
  @remove_tag.each do |tag|
  end

 


輸出output

mysql寫入

output{    
    if "exclude" in [tags] or "include" in [tags] {
        jdbc {
           driver_jar_path => "/home/elk/jar/jdbc/mysql-connector-java-5.1.46-bin.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://192.168.2.123:3306/cm?user=123&password=123"
          statement => [ "INSERT INTO buff  (URL,ACCESS_TIME) VALUES(?, ?)", "URL","ACCESS_TIME"]
        }
    }
}


注意 jar包位置,資料庫位置,使用者名稱密碼,以及變數的數量與values的?,後面的那些為變數的名稱,
意味第一個?的值為url變數的值

標準輸出(Stdout)

output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}


解釋
輸出外掛統一具有一個引數是 workers。Logstash 為輸出做了多執行緒的準備。
其次是 codec 設定。codec 的作用在之前已經講過。可能除了 codecs/multiline ,其他 codec 外掛本身並沒有太多的設定項。所以一般省略掉後面的配置區段。換句話說。上面配置示例的完全寫法應該是:

output {
    stdout {
        codec => rubydebug {
        }
        workers => 2
    }
}

單就 outputs/stdout 外掛來說,其最重要和常見的用途就是除錯。所以在不太有效的時候,加上命令列引數 -vv 執行,檢視更多詳細除錯資訊。


儲存成檔案(File)

 

配置示例

output {
 file {
   path => "/home/elk/logstash-6.3.1/job.log"
   codec => line { format => "custom format: %{message}"}
 }
}