1. 程式人生 > >大資料日誌分析系統-logstash

大資料日誌分析系統-logstash

logstash簡介

Logstash 是一個開源的資料收集引擎,它具有備實時資料傳輸能力。它可以統一過濾來自不同源的資料,並按照開發者的制定的規範輸出到目的地。

logstash-2.2.2的配置:

從logstash-forward        到kafka的配置

[email protected]:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf


input {

lumberjack {

port => "5044"

ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt"

ssl_key =>"/home/ubuntu/logstash-2.2.2/config/lumberjack.key"

type => "fc_access"

}

}

output {

if "_grokparsefailure" not in [tags] {

# stdout { codec => rubydebug }

kafka {

topic_id => "kafka_es"

bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092"

compression_type => "snappy"

acks => ["1"]

value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

timeout_ms => 10000

retries => 5

retry_backoff_ms => 100

send_buffer_bytes => 102400

workers => 2

}

}

}


從kafka到es配置

其中包括了對日誌各個欄位的解析,以及對異常日誌過濾(同時注意其中過濾了 不屬於當前時間前後5天的時間的日誌,為了防止異常日誌建立索引過多導致es報紅)

[email protected]

:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf

input {

kafka {

topic_id => "kafka_es"

group_id => "kafka_es"

zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181"

consumer_threads => 1

consumer_restart_on_error => true

consumer_restart_sleep_ms => 5000

decorate_events => true

consumer_timeout_ms => 1000

queue_size => 100

auto_offset_reset => "smallest"

rebalance_max_retries => 50

}

}

filter {

mutate {

add_field => [ "messageClone", "%{message}" ]

}

mutate {

split => { "messageClone" => '"' }

add_field => {"agent" => "%{[messageClone][3]}"}

}

useragent {

source => "agent"

}

mutate {

split => { "message" => " " }

add_field => {"timestamp" => "%{[message][0]}"}

add_field => {"reqtime" => "%{[message][1]}"}

add_field => {"clientIP" => "%{[message][2]}"}

add_field => {"squidCache" => "%{[message][3]}"}

add_field => {"repsize" => "%{[message][4]}"}

add_field => {"reqMethod" => "%{[message][5]}"}

add_field => {"requestURL" => "%{[message][6]}"}

add_field => {"username" => "%{[message][7]}"}

add_field => {"requestOriginSite" => "%{[message][8]}"}

add_field => {"mime" => "%{[message][9]}"}

add_field => {"referer" => "%{[message][10]}"}

add_field => {"agentCheck" => "%{[message][11]}"}

add_field => {"dnsGroup" => "%{[message][-1]}"}

remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"]

}

if [agentCheck] =~ "ChinaCache" {

grok { match => { "agentCheck" => "OOPS" } }

}

mutate {

convert => {

"timestamp" => "float"

"reqtime" => "integer"

"repsize" => "integer"

}

remove_field => ["agentCheck"]

}

ruby {

code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')"

}

date { match => [ "timestamp_str", "ISO8601" ]

}

mutate {

split => { "requestURL" => '/' }

add_field => {"uriHost" => "%{[requestURL][2]}"}

remove_field => ["timestamp_str"]

}

mutate {

join => { "requestURL" => '/' }

}

ruby {

code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"

}

}

output {

if "ChinaCache" not in [agent] {

# stdout { codec => "rubydebug" }

elasticsearch {

index => "logstash-%{+YYYY.MM.dd.HH}"

workers => 1

flush_size => 5000

idle_flush_time => 1

hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]

}

}

}


啟動命令:

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &

logstash-6.1.1配置

從filbeat到kafka的配置

[email protected]:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf

input {

beats {

port => "5055"

type => "log"

}

}

output {

# stdout { codec => rubydebug }

kafka {

codec => "json"

bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092"

topic_id => "test"

compression_type => "snappy"

value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

}

}


檢測
/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf  --config.test_and_exit
啟動

nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic   2>&1 >  /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log  &

相關推薦

資料日誌分析系統-logstash

logstash簡介Logstash 是一個開源的資料收集引擎,它具有備實時資料傳輸能力。它可以統一過濾來自不同源的資料,並按照開發者的制定的規範輸出到目的地。logstash-2.2.2的配置:從logstash-forward        到kafka的配置[email

資料日誌分析系統-hdfs日誌儲存

hdfs簡介:Hadoop分散式檔案系統(HDFS)被設計成適合執行在通用硬體(commodity hardware)上的分散式檔案系統。專案需求:使用hdfs進行客戶需要的指定域名時間打包日誌 以及原始日誌儲存進行離線計算遇到的問題:在這一步遇到的一個重要的問題:問題:從k

智慧工廠資料管控分析系統

智慧工廠的發展是因為製造業是國民經濟基礎,在《中國製造2025》規劃中提出:確立以加快新一代資訊科技與製造業深度融合為主線、促進從“中國製造”向“中國智造”轉型升級,實現製造業由大變強的歷史跨越。 MES系統是一套面向製造企業車間執行層最基礎的生產資訊化管理系統,可以為企業提供包括製造資料管理、計劃排程管理

資料日誌分析專案mapreduce程式

總體思路: 使用flume將伺服器上的日誌傳到hadoop上面,然後使用mapreduce程式完成資料清洗,統計pv,visit模型.最後使用azkaban定時執行程式. 使用者每次登入根據session來判斷. 本人親自測試可以使用 原始日誌欄位說明:

深度解析 Twitter Heron 資料實時分析系統

所有的老的生產環境的topology已經執行在Heron上, 每天大概處理幾十T的資料, billions of訊息 為什麼要重新設計Heron: 【題外話】這裡完全引用作者吐槽的問題, 不少問題,其實JStorm已經解決 (1)debug-ability 很差, 出現問題,很難發現 1.1 多個t

資料專案實戰之 --- 某App管理平臺的手機app日誌分析系統(三)

一、建立hive分割槽表 ---------------------------------------------------- 1.建立資料庫 $hive> create database applogsdb; 2.建立分割槽表 編寫指令碼。

ELK日誌分析系統(3)-logstash資料處理

  1. 概述   logspout收集資料以後,就會把資料傳送給logstash進行處理,本文主要講解logstash的input, filter, output處理 2. input   資料的輸入處理   支援tcp,udp等協議     晚上找資料建議在使用 LogStash::I

[資料專案]-0016-基於Spark2.x新聞網資料實時分析視覺化系統

2018最新最全大資料技術、專案視訊。整套視訊,非那種淘寶雜七雜八網上能免費找到拼湊的亂八七糟的幾年前的不成體系浪費咱們寶貴時間的垃圾,詳細內容如下,視訊高清不加密,需要的聯絡QQ:3164282908(加Q註明51CTO)。   課程介紹 本專案基於某新聞網使用者日誌分析系統進行講解

ELK日誌監控分析系統Logstash詳解之——output模組

摘要: Logstash的output模組,相比於input模組來說是一個輸出模組,output模組集成了大量的輸出外掛,可以輸出到指定檔案,也可輸出到指定的網路埠,當然也可以輸出資料到ES.在這裡我只介紹如何輸出到ES,至於如何輸出到埠和指定檔案,有很多的文件資料可查詢. Logs

ELK日誌監控分析系統Logstash詳解之——input模組

摘要: Logstash由三個元件構造成,分別是input、filter以及output。我們可以吧Logstash三個元件的工作流理解為:input收集資料,filter處理資料,output輸出資料。至於怎麼收集、去哪收集、怎麼處理、處理什麼、怎麼發生以及傳送到哪等等一些列的問題就是我

ELK日誌監控分析系統Logstash詳解之——filter模組

摘要: Logstash三個元件的第二個元件,也是真個Logstash工具中最複雜,最蛋疼的一個元件,當然,也是最有作用的一個元件。 1、grok外掛 grok外掛有非常強大的功能,他能匹配一切資料,但是他的效能和對資源的損耗同樣讓人詬病。 Logstash三個元件的第二個元件,也是

從零編寫日誌分析系統logstash

概述 logstash是負責採集和解析日誌的,將日誌解析成需要的格式儲存在elasticsearch或者其他地方。logstash提供了很多非常強大的外掛,這些外掛可以有效的把日誌資訊轉換成需要的格式。 一:安裝 二:配置 logstash提供了

Logstash+Elasticsearch+Kibana 聯合使用搭建日誌分析系統(Windows系統)

        最近在做日誌分析這塊兒,要使用 Logstash+Elasticsearch+Kibana 實現日誌的匯入、過濾及視覺化管理,官方文件寫的不夠詳細,網上的文章大多要麼是針對Linux系統的用法,要麼就是抄襲別人的配置大都沒法執行。費了很大勁才搞定了這仨東西,

第一節 elk日誌分析 資料日誌 win7 64位搭建elk

一 ELK 背景        日誌主要包括系統日誌、應用程式日誌和安全日誌。系統運維和開發人員可以通過日誌瞭解伺服器軟硬體資訊、檢查配置過程中的錯誤及錯誤發生的原因。經常分析日誌可以瞭解伺服器的負荷

搭建ELK(ElasticSearch+Logstash+Kibana)日誌分析系統(十四) logstash grok 正則解析日誌

摘要 這一節補充一下logstash使用grok正則解析日誌 Grok 是 Logstash 最重要的外掛。通過在filter中使用grok,可以把日誌中的關鍵字匹配出來。 grok正則主要有兩部分: 一是grok自帶的grok模式表示式,即是gr

Docker搭建ElasticSearch+Redis+Logstash+Filebeat日誌分析系統

root cli tab 基本 步驟 stdout ood beginning disco 一、系統的基本架構   在以前的博客中有介紹過在物理機上搭建ELK日誌分析系統,有興趣的朋友可以看一看-------------->>鏈接戳我<<。這篇博客

ELK日誌分析系統 介紹 安裝配置

elkELK日誌分析系統一、ELK介紹 ELK顧名思義:是由Elasticsearch,Logstash 和 Kibana三部分組成的。 其中Elasticsearch 是一個實時的分布式搜索和分析引擎,它可以用於全文搜索,結構化搜索以及分析。它是一個建立在全文搜索引擎 Apache Lucene

開源日誌分析系統ELK平臺搭建部署

logstash 日誌分析系統 elk 開源日誌分析系統ELK平臺搭建部署 一、前言日誌主要包括系統日誌、應用程序日誌和安全日誌。系統運維和開發人員可以通過日誌了解服務器軟硬件信息、檢查配置過程中的錯誤及錯誤發生的原因。經常分析日誌可以了解服務器的負荷,性能安全性,從而及時采取措施糾正錯誤。通常

ELK日誌分析系統搭建配置

elk我們主要用ELK日誌分析系統來分析Nginx訪問日誌,mysql慢查詢日誌,tomcat運行日誌以及系統日誌等。介紹:ELK:ElasticSearch+LogStash+Kibana=ElkStackElasticSearch:存儲、收索、分析(可以用solr替代)LogStash:收集器,輸入,處理

ELK 日誌分析系統

日誌 elk 分析系統 轉自:http://467754239.blog.51cto.com/4878013/1700828/ 一、簡介1、核心組成ELK由Elasticsearch、Logstash和Kibana三部分組件組成;Elasticsearch是個開源分布式搜索引擎,它的特點有:分布式