1. 程式人生 > >ELK --- Grok正則過濾Linux系統登錄日誌

ELK --- Grok正則過濾Linux系統登錄日誌

mes 過濾 .mm mat 如果 正則 from 來源 就是

過濾Linux系統登錄日誌/var/log/secure

登陸成功

Jan  6 17:11:47 localhost sshd[3324]: Received disconnect from 172.16.0.13: 11: disconnected by user
Jan  6 17:11:47 localhost sshd[3324]: pam_unix(sshd:session): session closed for user root
Jan  6 17:11:48 localhost sshd[3358]: Address 172.16.0.13 maps to localhost, but this does not map back to the address - POSSIBLE BREAK-IN ATTEMPT!
Jan  6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
Jan  6 17:11:51 localhost sshd[3358]: pam_unix(sshd:session): session opened for user root by (uid=0)

登陸失敗

Jan  6 17:13:10 localhost sshd[3380]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=172.16.0.39  user=root
Jan  6 17:13:12 localhost sshd[3380]: Failed password for root from 172.16.0.39 port 58481 ssh2

以上信息中我們只用判斷登錄成功或失敗

Jan  6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
或者
Jan  6 17:13:12 localhost sshd[3380]: Failed password for root from 172.16.0.39 port 58481 ssh2
--------------------- 

logstash配置

input {
    file {
        path => "/var/log/secure"
    }
}

filter {
    grok {
        match => {
            "message" => ".* sshd\[\d+\]: (?<status>\S+) .* (?<ClientIP>(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})?) .*"
        }
        overwrite => ["message"]
    }
}

output {
    if [ClientIP] =~ /\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/ and ([status] == "Accepted" or [status] == "Failed") {
        elasticsearch {
            hosts => "172.16.11.199"
            index => "logstash-%{+YYYY.MM.dd}"
        }
    }
}

配置解釋:

  • input插件使用file讀取日誌文件
  • filter插件使用grok來匹配相應的日誌行

    1. message中定義了兩個Fields,分別匹配登錄來源IP,和登錄狀態
    2. overwrite表示重寫message行
  • output插件指定將過濾出來的信息輸出到哪個地方,這裏輸出到elasticsearch
    1. 一個條件判斷,判斷filter中定義的兩個fields是否匹配,如果匹配則輸出到elasticsearch中,如果不匹配則不操作

正則解釋

Jan  6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
  • .*匹配Jan 6 17:11:51 localhost
  • sshd[\d+]: 匹配sshd[3358]: 段,\d+匹配多個數字
  • (?<status>\S+):
    1.(?<xxx>正則表達式):定義一個xxx字段匹配後面正則表達式,類似{xxx:匹配的結果},在上面output中的條件判斷即可使用該字段來使用匹配到的結果
    2.\S+表示多個字符串,也就是匹配Accepted或Failed

  • (?<ClientIP>(?:\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})?)
    1.先定義一個ClientIP字段
    2.(?:...)? 表示匹配一個ip但不保存供以後引用,如果(...)則以後可以使用$1來調用匹配到的值,最後一個?表示非貪婪匹配,盡可能少的匹配

最終輸出結果:

{
       "message" => "Mar 22 10:16:51 k8s-n2 sshd[27997]: Failed password for root from 10.201.1.10 port 39302 ssh2",
      "@version" => "1",
    "@timestamp" => "2019-03-22T02:16:51.813Z",
          "path" => "/var/log/secure",
          "host" => "k8s-n2",
        "status" => "Failed",
      "ClientIP" => "10.201.1.10"
}

ELK --- Grok正則過濾Linux系統登錄日誌