1. 程式人生 > >Logstash日誌收集葵花寶典大整合

Logstash日誌收集葵花寶典大整合

之前有做過一篇關於ELK日誌收集分析平臺的博文,而這片博文這是一個後續的版本,在這裡我將通過五個案例向大家詳細的講述Logstash集中、轉換和儲存資料的過程和相關配置使用,Logstash它的強大之處在於他可以通過非常多的方式去進行轉換日誌格式然後進行儲存,而支援儲存的方式也非常的多,但這裡主要結合Elasticsearc進行。


  輸入

Logstash的輸入主要是Input外掛進行工作,但是Input可以進行標準輸入(Stdin)、讀取檔案(File)、讀取網路資料(TCP)、生成測試資料(Generator)、讀取Syslog資料、讀取Redis資料、讀取collectd資料,當然這裡的部分功能需要部分依賴關係,這裡就不做詳細說明,如果真的遇到可以在下方評論區進行留言或者Goolge一下就好了,主要都是一下yum安裝的程式,下面給大家演示一下標準輸入和檔案輸入

標準輸入

input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}

我們使用bin/logstash -f stdin.conf命令就會輸出一下的資訊

{
    "message" => "hello world",
    "@version" => "1",
    "@timestamp" => "2014-08-08T06:48:47.789Z",
    "type" => "std",
    "tags" => [
        [0] "add"
    ],
    "key" => "value",
    "host" => "raochenlindeMacBook-Air.local"
}

type 和 tags 是 logstash 事件中兩個特殊的欄位。通常來說我們會在輸入區段中通過 type 來標記事件型別 —— 我們肯定是提前能知道這個事件屬於什麼型別的。而 tags 則是在資料處理過程中,由具體的外掛來新增或者刪除的。

檔案輸入

分析日誌應該是一個運維工程師最常見的工作了。所以我們先學習一下怎麼用 logstash 來處理日誌檔案。

Logstash 使用一個名叫 FileWatch 的 Ruby Gem 庫來監聽檔案變化。這個庫支援 glob 展開檔案路徑,而且會記錄一個叫 .sincedb 的資料庫檔案來跟蹤被監聽的日誌檔案的當前讀取位置。所以,不要擔心 logstash 會漏過你的資料。

sincedb 檔案中記錄了每個被監聽的檔案的 inode, major number, minor number 和 pos。

這裡就以生產環境下的Elasticsearc Java日誌作為例項講解,首先我們展示配置檔案進行分析學習

input {
    file {
        path => "/var/log/elasticsearch/KJ-Cloud.log"
        type => "es-log"
        start_position => "beginning"
	        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
	        }
    }
}

output {
    if [type] == "es-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-es-log-%{+YYYY.MM}"
        }
    }
}
    • path
      指定讀取的日誌檔案路徑,FileWatch 只支援檔案的絕對路徑,而且會不自動遞迴目錄,所以有需要的話,請用陣列方式都寫明具體哪些檔案。由於FileWatch不支援fluentd 那樣的path => "/path/to/%{+yyyy/MM/dd/hh}.log"寫法。達到相同目的,你只能寫成path => "/path/to/*/*/*/*.log"

    • type
      作為日誌歸納欄位使用,後面也可以作為變數去進行一個判斷

    • start_position
      logstash 從什麼位置開始讀取檔案資料,預設是結束位置,也就是說 logstash 程序會以類似 tail -F 的形式執行。如果你是要匯入原有資料,把這個設定改成 "beginning",logstash 程序就從頭開始讀取,有點類似 cat,但是讀到最後一行不會終止,而是繼續變成 tail -F

    • codec
      codec是一個編解碼外掛,他可以將多行資料進行合併,也可以採用Josn解碼,而這裡的codec向大家展示瞭如何將多行合併成一行,pattern則是指定了它的分隔符

    • discover_interval
      logstash 每隔多久去檢查一次被監聽的 path 下是否有新檔案。預設值是 15 秒。

    • exclude
      不想被監聽的檔案可以排除出去,這裡跟 path 一樣支援 glob 展開。

    • sincedb_path
      如果你不想用預設的 $HOME/.sincedb(Windows 平臺上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通過這個配置定義 sincedb 檔案到其他位置。

    • sincedb_write_interval
      logstash 每隔多久寫一次 sincedb 檔案,預設是 15 秒。

    • stat_interval
      logstash 每隔多久檢查一次被監聽檔案狀態(是否有更新),預設是 1 秒。

輸出的格式如下:

{
    "_index": "logstash-es-log-2018.09",
    "_type": "doc",
    "_id": "gz9oDWYBWCMetPHc09lE",
    "_version": 1,
    "_score": 1,
    "_source": {
        "host": "test.kemin-cloud.com",
        "@version": "1",
        "type": "es-log",
        "message": "[2018-09-25T05:07:44,495][WARN ][o.e.m.j.JvmGcMonitorService] [center.kemin-cloud.com] [gc][young][22469][1987] duration [2.8s], collections [1]/[3s], total [2.8s]/[1.8m], memory [346.7mb]->[228.5mb]/[1007.3mb], all_pools {[young] [123.9mb]->[387.1kb]/[133.1mb]}{[survivor] [15mb]->[16.6mb]/[16.6mb]}{[old] [207.7mb]->[211.5mb]/[857.6mb]}",
        "path": "/var/log/elasticsearch/KJ-Cloud.log",
        "@timestamp": "2018-09-24T21:07:45.494Z"
    }
}

相信大家到這裡還是有點暈,那我們再哪一個比較常用的生產例項給大家說明,那就是Nginx的訪問日誌,但是這裡需要做些小變動,由於我們平常儲存Nginx的日誌格式並不會很好的讓logstash進行收集,所以我們需要修改nginx的日誌格式儲存為josn格式

log_format json '{"@timestamp":"$time_iso8601",'
        '"@version":"1",'
        '"host":"$server_addr",'
        '"agent":"$remote_addr",'
        '"client":"$http_x_forwarded_for",'
         '"size":$body_bytes_sent,'
        '"responsetime":$request_time,'
        '"domain":"$host",'
        '"url":"$uri",'
        '"status":"$status"}';

access_log  /var/log/nginx/access_json.log  json;

重啟nginx服務,那麼nginx的日誌格式將會變成json格式,那麼現在再來編寫logstash配置檔案

input {
    file {
        path => "/var/log/nginx/access_json.log"
        type => "nx-ac-log"
        start_position => "beginning"
        codec => json
    }

}
filter {
    if [type] == "nx-ac-log" {
        geoip {
            source => "client"
            target => "geoip"
            database => "/etc/logstash/conf.d/GeoLiteCity.dat"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
        }
        mutate {
            convert => [ "[geoip][coordinates]", "float" ]
        }
    }

}
output {

    if [type] == "nx-ac-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            manage_template => true
            index => "logstash-nx-ac-log-%{+YYYY.MM}"
        }
    }

}

這裡把codec的指向值變成了json,則表示按照json的日誌格式去讀取,而中間的添加了一個filter外掛,這裡是在原本的基礎上做了一些小小的改變,那就是根據客戶的IP地址進行經緯度定位,這樣子就可以在Kibana上面進行區域性分析,接下來就會和大家好好的解析filter外掛

輸出格式如下:

{
    "_index": "logstash-nx-ac-log-2018.09",
    "_type": "doc",
    "_id": "3yCMA2YBP74d8uJuELhF",
    "_version": 1,
    "_score": 1,
    "_source": {
        "agent": "172.31.243.145",
        "geoip": {
            "continent_code": "AS",
            "ip": "162.35.242.31",
            "location": {
                "lat": 22.25,
                "lon": 114.1667
            },
            "country_code2": "HK",
            "country_code3": "HK",
            "longitude": 114.1667,
            "coordinates": [
                114.1667
                ,
                22.25
            ],
            "country_name": "Hong Kong",
            "timezone": "Asia/Hong_Kong",
            "latitude": 22.25
        },
        "status": "200",
        "client": "47.52.16.198",
        "size": 12782,
        "responsetime": 0.135,
        "@timestamp": "2018-09-22T23:10:02.000Z",
        "url": "/zabbix/zabbix.php",
        "type": "nx-ac-log",
        "@version": "1",
        "domain": "test.kemin-cloud.com",
        "host": "172.31.243.146",
        "path": "/var/log/nginx/access_json.log"
    }
}

  過濾器

豐富的過濾器外掛的存在是 logstash 威力如此強大的重要因素,但是稱他為過濾器,其實提供的不單單是過濾的功能(小編這裡覺得這個命名實在太隨意了應該起一個更加爆炸性的名字),他提供正則捕獲(Grok)、時間處理(Date)、資料修改(Mutate)、地址查詢歸類(GeoIP)、編解碼(json)、拆分事件(split)、匹配歸類(UserAgent)、切分(Key-Value)、數值統計(Metrics)還有隨心所欲的Ruby處理(實在是強大到沒有朋友,應了那句歌詞,無敵是多麼,多麼寂寞),下面具體會以幾個例項進行解析,還有的大家可以訪問https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html進行詳細的瞭解,當然也可以訪問官網瞭解(小編是為菜鳥所以面對官網專業性的英文只能默默流淚。。。。)

首先把我們上面Nginx訪問日誌使用的地址查詢歸類講解講解

filter {
    if [type] == "nx-ac-log" {
        geoip {
            source => "client"
            target => "geoip"
            database => "/etc/logstash/conf.d/GeoLiteCity.dat"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
        }
        mutate {
            convert => [ "[geoip][coordinates]", "float" ]
        }
    }
}

GeoIP 是最常見的免費 IP 地址歸類查詢庫,同時也有收費版可以採購。GeoIP 庫可以根據 IP 地址提供對應的地域資訊,包括國別,省市,經緯度等,對於視覺化地圖和區域統計非常有用。

GeoIP 庫資料較多,如果你不需要這麼多內容,可以通過add_field選項指定自己所需要的。下例為全部可選內容:

filter {
    geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }
}

需要注意的是:geoip.location 是 logstash 通過 latitude 和 longitude額外生成的資料。所以,如果你是想要經緯度又不想重複資料的話,需要進行對應修改,同時geoip 外掛的 "source" 欄位可以是任一處理後的欄位,比如 "client_ip",但是欄位內容卻需要小心!geoip 庫內只存有公共網路上的 IP 資訊,查詢不到結果的,會直接返回 null,而 logstash 的 geoip 外掛對 null 結果的處理是:不生成對應的 geoip.欄位。當然先使用從第三方購買回來的IP地址庫也可以使用database進行指定位置(但是這個小編測試好像不行,有待驗證)

我們上面進行完IP的歸納,所以我們後面緊跟著就要用到Mutate進行資料修改,Mutate的修改可以對型別進行轉換、字串處理、欄位處理,而我們這裡需要使用它的型別轉換,但這裡需要注意一點就是mutate除了轉換簡單的字元值,還支援對陣列型別的欄位進行轉換,即將 ["1","2"] 轉換成 [1,2]。但不支援對雜湊型別的欄位做類似處理,有這方面需求的可以採用稍後講述的 filters/ruby 外掛完成。

mutate {
    convert => [ "[geoip][coordinates]", "float" ]
}

把之前的分析完之後,現在我們來進行一個複雜點的來分析,那就是Nginx的錯誤日誌。

input {

    file {
        path => "/var/log/nginx/error.log"
        type => "nx-er-log"
        start_position => "beginning"
    }

}
filter {

    if [type] == "nx-er-log" {
        grok {
            match => { "message" => "(?<datetime>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:err_message}(?:, client: (?<client>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:domain}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?"}
        }

    }

}
output {

    if [type] == "nx-er-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-nx-er-log-%{+YYYY.MM}"
        }
    }

}

由於Nginx的錯誤日誌不像Nginx的訪問日誌一樣可以修改為json格式,所以這裡需要使用到grok正則表示式進行過濾匹配,官方提供的預定義 grok 表示式見:https://github.com/logstash/logstash/tree/v1.4.2/patterns

輸出格式如下:

{
	"_index": "logstash-nx-er-log-2018.09",
	"_type": "doc",
	"_id": "TSDoA2YBP74d8uJudupM",
	"_version": 1,
	"_score": 1,
	"_source": {
		"@version": "1",
		"host": "hkagent.kemin-cloud.com",
		"type": "nx-er-log",
		"pid": "2525",
		"severity": "error",
		"request_host": ""st.kemin-cloud.com"",
		"path": "/var/log/nginx/error.log",
		"referrer": "http://www.speedtest.cn/",
		"message": "2018/09/23 08:50:57 [error] 2525#0: *72394 client intended to send too large body: 1049352 bytes, client: 113.88.103.126, server: st.kemin-cloud.com, request: "POST /index.php?r=0.08330938681162791 HTTP/1.1", host: "st.kemin-cloud.com", referrer: "http://www.speedtest.cn/"",
		"datetime": "2018/09/23 08:50:57",
		"domain": "st.kemin-cloud.com",
		"@timestamp": "2018-09-23T00:50:58.056Z",
		"err_message": "*72394 client intended to send too large body: 1049352 bytes",
		"request": ""POST /index.php?r=0.08330938681162791 HTTP/1.1"",
		"client": "113.88.103.126"
	}
}

接下來這裡再給大家提供利用Grok正則匹配的一個實際生產環境,收集MySQL慢查詢日誌

input {
    file {
        type => "mysqlslow-log"
        path => "/var/log/mysql/mysql-slow.log"
        start_position => "beginning"
        codec => multiline {
            pattern => "^# [email protected]:"
            negate => true
            what => "previous"
        }
    }
}
filter {

    if [type] == "mysqlslow-log" {
        grok {
            match => { "message" => "SELECT SLEEP" }
            add_tag => [ "sleep_drop" ]
            tag_on_failure => []
        }

        if "sleep_drop" in [tags] {
            drop {}
        }

        grok {
            match => { "message" => "(?m)^# [email protected]: %{USER:User}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:Client_IP})?\]\s.*# Query_time: %{NUMBER:Query_Time:float}\s+Lock_time: %{NUMBER:Lock_Time:float}\s+Rows_sent: %{NUMBER:Rows_Sent:int}\s+Rows_examined: %{NUMBER:Rows_Examined:int}\s*(?:use %{DATA:Database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<Query>(?<Action>\w+)\s+.*)\n# Time:.*$" }
        }

        date {
            match => [ "timestamp","UNIX" ]
            remove_field => [ "timestamp" ]
        }

    }

}
output {

    if [type] == "mysqlslow-log" {
       elasticsearch {
           hosts => ["172.31.243.146:9200"]
           index => "logstash-mysqlslow-log-%{+YYYY.MM}"
       }
   }

}

MySQL的慢查詢日誌收集需要mysql開啟慢查詢記錄,正則匹配過濾的時候比較複雜需要大家細心的去對照具體日誌記錄進行分析,這裡還使用到了時間處理filters/date 外掛可以用來轉換你的日誌記錄中的時間字串,這裡需要大家注意一點就是在稍後的outputs/elasticsearch中常用的%{+YYYY.MM.dd}這種寫法必須讀取@timestamp資料,所以一定不要直接刪掉這個欄位保留自己的欄位,而是應該用filters/date轉換後刪除自己的欄位!這在匯入舊資料的時候固然非常有用,而在實時資料處理的時候同樣有效,因為一般情況下資料流程中我們都會有緩衝區,導致最終的實際處理時間跟事件產生時間略有偏差。

filters/date 外掛支援五種時間格式:

  • ISO8601
    類似 "2011-04-19T03:44:01.103Z" 這樣的格式。具體Z後面可以有 "08:00"也可以沒有,".103"這個也可以沒有。常用場景裡來說,Nginx 的 log_format 配置裡就可以使用 $time_iso8601 變數來記錄請求時間成這種格式。

  • UNIX
    UNIX 時間戳格式,記錄的是從 1970 年起始至今的總秒數。Squid 的預設日誌格式中就使用了這種格式。

  • UNIX_MS
    這個時間戳則是從 1970 年起始至今的總毫秒數。據我所知,JavaScript 裡經常使用這個時間格式。

  • TAI64N
    TAI64N 格式比較少見,是這個樣子的:@4000000052f88ea32489532c。我目前只知道常見應用中,qmail會用這個格式。

  • Joda-Time庫
    Logstash 內部使用了 Java 的 Joda 時間庫來作時間處理。所以我們可以使用 Joda 庫所支援的時間格式來作具體定義。具體對照表這裡就不粘出來了,需要的下方留言我在粘吧~

輸出格式如下:

{
	"_index": "logstash-mysqlslow-log-2018.10",
	"_type": "doc",
	"_id": "1RGBk2YBAXDt9GzpgK-P",
	"_version": 1,
	"_score": 1,
	"_source": {
		"@version": "1",
		"type": "mysqlslow-log",
		"Lock_Time": 0.000037,
		"@timestamp": "2018-10-18T16:00:23.000Z",
		"Rows_Sent": 0,
		"Query_Time": 1.058855,
		"User": "zabbix",
		"message": "# [email protected]: zabbix[zabbix] @ [172.31.243.146] # Query_time: 1.058855 Lock_time: 0.000037 Rows_sent: 0 Rows_examined: 726 SET 	timestamp=1539878423; delete from history_uint where itemid=119 and clock<1537286352; # Time: 181019 0:00:25",
		"path": "/var/log/mysql/mysql-slow.log",
		"Rows_Examined": 726,
		"host": "0.0.0.0",
		"Action": "delete",
		"Query": "delete from history_uint where itemid=119 and clock<1537286352;",
		"Client_IP": "172.31.243.146",
		"tags": [
			"multiline"
		]
	}
}

  輸出

Logstash 早期有三個不同的 elasticsearch 外掛。到 1.4.0 版本的時候,開發者徹底重寫了 LogStash::Outputs::Elasticsearch 外掛。從此,我們只需要用這一個外掛,就能任意切換使用 Elasticsearch 叢集支援的各種不同協議了。下面我們來看看它的樣式

output {
    elasticsearch {
        host => "192.168.0.2"
        protocol => "http"
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        index_type => "%{type}"
        workers => 5
        template_overwrite => true
    }
}

這裡的模板只是顯示了一小部分功能,輸出的優化其實還有很多的,Logstash1.4.2在transport和http協議的情況下是固定連線指定host傳送資料。從1.5.0開始,host可以設定陣列,它會從節點列表中選取不同的節點發送資料,達到Round-Robin負載均衡的效果,還有就是對效能要求不高的,可以在啟動logstash程序時,使用命令export BULK="esruby"配置環境變數ENV["BULK"],強制採用elasticsearch官方Ruby庫而對效能要求極高的,可以手動更新ftw庫版本,目前最新版是0.0.42版,據稱記憶體問題在0.0.40版即解決。


  最後總結(敲黑板,劃重點)

小編髮現寫的太長而且已經寫了一個晚上,才寫了logstash的九牛一毛,實際上logstash還有很多很多東西都略過了,比如說input當中的syslog還沒有提及,過濾外掛裡的好幾個外掛都沒有說明展示案例,輸出的部分還有許多方式比如傳送郵件,輸出到redis當中這些等等,如果有興趣的話大家可以在官網上自主查閱,ELK的整個架構說簡陋就那幾部分,但是每一個部分都可以說上一千零一夜,所以可想而知是多的麼的強大,在深入瞭解,還可以寫出屬於自己的定製外掛,同時還有尚未進入官方庫的外掛,包括kafka(卡夫卡)這些比較耳熟的東西。

如果當中有什麼不對的地方或者看不懂的地方歡迎在下方留言指出,我會及時修正和回覆~

寫了一整晚~走過路過點個關注唄~