1. 程式人生 > >日誌監控平臺搭建 關於flume Kafka Elk

日誌監控平臺搭建 關於flume Kafka Elk

最近需要搭建一套日誌監控平臺,參考了新浪與美團的一些東西.現在實錄一下搭建與優化調整的過程

目前把這幾件放在一起的文件還不夠多,其中相當一部分因為elk的升級配置也已經不能用了,更多的是單機版的配置,完全沒有參考性.

優化的部分將等待專案與新平臺正式上線在另一篇文章寫出

拓撲圖

軟硬體配置

  • 本機 ubuntu 14.04
  • 線上 centos 6.5
host 本地搭建 線上環境
c1 1core 1g 4core 8g
c2 1core 1g 4core 8g
c3 1core 1g 4core 8g
c4 2core 4g 8core 32g
c5 2core 4g 4core 32g
c1 c2 c3 c4 c5
jdk+scala+zk+kafka 同左 同左 jdk+es+logstash+kibana jdk+es

搭建

basic

新機器修改root密碼sudo passwd

建立使用者

useradd cluster
passwd cluster
chmod +w /etc/sudoers
vim  /etc/sudoers
cluster ALL=(root)NOPASSWD:ALL 
chmod -w /etc/sudoers
mkdir /home/cluster
mkdir /home/stack
chmod 777 /home/cluster
chmod 777 /home/stack

以上部分/home/stack用於儲存所需所有tar.gz包

/home/cluster作為所有軟體的安裝目錄

而後建立cluster目錄下data目錄,用於存放各元件配置,日誌,資料

scp推薦工具ZOC7

同步各個機器的hosts

127.0.0.1 localhost 
10.1.12.25 c1
10.1.12.23 c2
10.1.12.24 c3
10.1.12.27 c4
10.1.12.28 c5

分發各機器的rsa公鑰

ssh-keygen
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 700 ~/.ssh/authorized_keys 
chmod 600 -R
~/.ssh/

各機器profile設定

tar -zxvf 
sudo vim /etc/profile
export JAVA_HOME=/home/cluster/jdk
export ZK_HOME=/home/cluster/zookeeper
export KAFKA_HOME=/home/cluster/kafka
export SCALA_HOME=/home/cluster/scala
export PATH=$PATH:$JAVA_HOME/bin:$ZK_HOME/bin:$KAFKA_HOME/bin:$SCALA_HOME/bin   
替換系統預設java
sudo update-alternatives --install /usr/bin/java java /home/cluster/jdk/jre/bin/java 301 
sudo update-alternatives --config java 
java -version

前三臺機器的cluster目錄樹

drwxr-xr-x  4 root    root    4096  4月 20 14:34 data/
drwxr-xr-x  8 uucp        143 4096  3月 21 13:13 jdk/
drwxr-xr-x  7 root    root    4096  4月 20 14:30 kafka/
drwxrwxr-x  6 cluster cluster 4096  3月  4 23:30 scala/
drwxr-xr-x 10 zy      zy      4096  4月 20 14:13 zookeeper/  

後兩臺機器的cluster目錄樹

drwxr-xr-x  4 zy      root    4096  4月 21 17:23 data/
drwxrwxrwx  7 zy      root    4096  4月 21 15:07 elasticsearch/
drwxr-xr-x  8 zy          143 4096  3月 21 13:13 jdk/
drwxr-xr-x 10 zy      staff   4096  3月 29 06:46 kibana/
drwxr-xr-x  5 zy      root    4096  4月 21 18:06 logstash/  

開發環境下關閉防火牆

chkconfig  iptables off && service iptables status或者
ufw disable或者
systemctl stop firewalld.service && systemctl disable firewalld.service && setenforce 0

非ubuntu機器關閉SELinux

修改 /etc/selinux/config,將 SELINUX=enforcing 改為 SELINUX=disabled
selinux預設ubuntu不安裝,iptables預設也是全開放的.可以用getenforce和iptables -L命令檢視下兩個元件的狀態

同步各機器時區

cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime或者
ntpdate cn.pool.ntp.org

zookeeper配置

修改conf目錄下模板配置為zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/cluster/data/zookeeper
clientPort=2181
server.1= c1:2888:3888
server.2= c2:2888:3888
server.3= c3:2888:3888  

而後

在配置的dataDir目錄下建立一個myid檔案,裡面寫入一個0-255之間的一個隨意數字,每個zk上這個檔案的數字要是不一樣的,這些數字應該是從1開始,依次寫每個伺服器。

檔案中序號要與dn節點下的zk配置序號一直,如:server.1=c1:2888:3888,那麼dn1節點下的myid配置檔案應該寫上1

  • 各節點啟動:bin/zkServer.sh start
  • 檢視節點狀態與leader與否bin/zkServer.sh status
  • 檢視java程序jps

kafka配置

配置目錄下

zookeeper.properties

dataDir=/home/cluster/data/zookeeper
clientPort=2181
maxClientCnxns=0  

server.properties

############### Server Basics ###############
broker.id=0
############# Socket Server Settings #############
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
##############Log Basics ##############
log.dirs=/home/cluster/data/kafka/log
num.partitions=1
num.recovery.threads.per.data.dir=1
############ Log Retention Policy ###############
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############## Zookeeper ###############
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000  

producer.properties

############### Producer Basics ##############
metadata.broker.list=c1:9092,c2:9092,c3:9092
producer.type=sync
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder    

consumer.properties

zookeeper.connect=c1:2181,c2:2181,c3:2181
zookeeper.connection.timeout.ms=6000
group.id=cluster-consumer-group 
  • 啟動nohup bin/kafka-server-start.sh config/server.properties &
  • 依然jpsjps

測試

本部分的分片partitions有待調整,分散式的logstash需要分片的訊息,否則會出現訊息順序錯誤

leader上執行

bin/kafka-topics.sh --zookeeper c1:2181,c2:2181,c3:2181 --topic tt_topic --replication-factor 3 --partitions 3 --create
bin/kafka-topics.sh --zookeeper c1:2181,c2:2181,c3:2181 --topic tt_topic --describe

leader訊息傳送

bin/kafka-console-producer.sh --broker-list c1:9092,c2:9092,c3:9092 --topic tt_topic

從者訊息消費

bin/kafka-console-consumer.sh --zookeeper c1:2181,c2:2181,c3:2181 --from-beginning --topic tt_topic

elasticsearch配置

修改es資料夾所屬使用者組 chown -R cluster /home/cluster/elasticsearch 修改data資料夾所屬使用者組 chown -R cluster /home/cluster/data

es配置

cluster.name: elasticsearch
node.name: c4
path.data: /home/cluster/data/elasticsearch/data
path.logs: /home/cluster/data/elasticsearch/log
bootstrap.mlockall: true
//這行一定要填ip
network.host: 10.1.12.27
http.port: 9200
discovery.zen.ping.unicast.hosts: ["c4", "c5"]
discovery.zen.minimum_master_nodes: 2 

安裝外掛

 bin/plugin install mobz/elasticsearch-head
 bin/plugin install lmenezes/elasticsearch-kopf
 bin/plugin install lukas-vlcek/bigdesk
  • 執行bin/elasticsearch -d
  • 狀態檢視http://10.1.12.27:9200/_plugin/head/

kibana配置

基本設定需要修改的部分

server.port: 5601
server.host: "c4"
elasticsearch.url: "http://c4:9200"  
  • 啟動nohup bin/kibana &
  • 訪問http://10.1.12.27:5601

logstash配置

修改ruby源,修改Gemfile檔案https://ruby.taobao.org

安裝外掛

bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch

新建kafka-logstash-es.conf

置於cluster/data/logstash/conf目錄下

input {
    kafka {
        zk_connect => "c1:2181,c2:2181,c3:2181"
        group_id => "cluster-consumer-group"
        topic_id => "tt_topic"
        reset_beginning => false 
        consumer_threads => 5  
        decorate_events => true 
        codec => "plain"
        }
    }
output {
    elasticsearch {
        hosts => ["c4:9200","c5:9200"]
        index => "logstash-log-%{+YYYY.MM.dd}"
        workers => 5
        codec => "json"
		  }
	 }

測試配置檔案

bin/logstash -f /home/cluster/data/logstash/conf/kafka-logstash-es.conf --configtest

執行

nohup bin/logstash -f /home/cluster/data/logstash/conf/kafka-logstash-es.conf  &

這個平臺搭建的後期我遇見了新的需求,對flume的定製需求越來越多,如果你不想面對這種情況,那麼可以這樣:

bin/logstash-plugin install logstash-input-log4j 
bin/logstash-plugin install logstash-output-kafka

把flume的部分替換成使用logstash來進行:

input{
    log4j {
        mode => "server"
        host => "[c1/c2/c3]"
        port => 4560
    }
}

output{
    kafka {
        bootstrap_servers => "c1:9092,c2:9092,c3:9092"
        topic_id => "tt_topic"
        workers => 5
        codec => "plain"
    }
}

同樣在log4j中配置新的SocketAppender指向掛在logstash叢集前的負載均衡.

<appender name="LOGSTASH-APPENDER" class="org.apache.log4j.net.SocketAppender">
    <param name="remoteHost" value="lb1" />
    <param name="port" value="4560" />
    <param name="Threshold" value="INFO" />
    <param name="ReconnectionDelay" value="1000" />
    <param name="LocationInfo" value="true" />
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%-d{yyyy-MM-dd HH:mm:ss}-[%p]-[%l]-%m%n" />
    </layout>
  </appender>

想要使用多個 logstash 端協同消費同一個 topic 的話,那麼需要把兩個或是多個 logstash 消費端配置成相同的 group_id 和 topic_id

但是前提是要把相應的 topic 分多個 partitions (區),多個消費者消費是無法保證訊息的消費順序性的

檢視後臺任務 jobs

殺掉後臺任務 kill %number

flume配置

  • 確定jdk
  • 上傳stack
  • 同步hosts

flume-env.sh

export JAVA_HOME=/opt/software/java 
export JAVA_OPTS="-Xms1024m -Xmx2048m" 

flume-kafka.properties

ag1.sources=src1 src2
ag1.sinks=sink1
ag1.channels=chn1

ag1.sources.src1.type = exec
ag1.sources.src1.shell = /bin/bash -c
ag1.sources.src1.command = tail -F tt.log
ag1.sources.src1.channels = chn1

ag1.sources.src2.type = exec
ag1.sources.src2.shell = /bin/bash -c
ag1.sources.src2.command = tail -F tt.log
ag1.sources.src2.channels = chn1

ag1.channels.chn1.type = memory
agent.channels.chn1.keep-alive = 60  
ag1.channels.chn1.capacity = 1000
ag1.channels.chn1.transactionCapacity = 100


ag1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
ag1.sinks.sink1.topic = tt_topic
ag1.sinks.sink1.brokerList = c1:9092:c2:9092:c2:9092
ag1.sinks.sink1.channel = chn1

啟動

bin/flume-ng agent -n ag1  -c conf -f conf/flume-kafka.properties

初步成果

初步成果如圖,我們成功把22臺機器的agent安好,獲得了應用叢集上的指定日誌內容,後續的優化包括

  • 效能
  • 日誌filter
  • 儲存/壓縮/備份
  • visualize與dashboard配置
  • 更多視覺化外掛安裝