1. 程式人生 > >Centos7上 Nginx + ELK Stack +Kafka + Filebeat 實戰二

Centos7上 Nginx + ELK Stack +Kafka + Filebeat 實戰二

kafka zookeeper elk

本文與前文是有關聯的,之前的兩篇文章客官可以擡腿出門右轉,elk 導讀, 實戰一
kafka的配置安裝:
#kafka 和zookeeper 都依賴java ,機器上必須安裝java,具體安裝方法和效驗方法,請各位客官擡腿向上看!
#下載安裝包:同樣放到/opt/elk/ 的目錄裏邊:
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.12-1.0.1.tgz
#解壓安裝包:
tar -zxvf kafka_2.12-1.0.1.tgz
#kafka 采用集群方式部署,測試環境部署,所以我用兩臺機器模擬出3個節點的cluster 。kafka 依賴zookeeper,zookeeper 簡稱ZK,已經包含在kafka的tar包裏,不需要而外下載安裝,為啥要用三個節點呢?這個是zk的一個選舉leader的特性,官方推薦是奇數個server,奇數個最少3個,這樣壞一個還有兩個,可以正常選舉,以下是zookeeper給出的官方說法:
For replicated mode, a minimum of three servers are required, and it is strongly recommended that you have an odd number of servers. If you only have two servers, then you are in a situation where if one of them fails, there are not enough machines to form a majority quorum. Two servers is inherently less stable than a single server, because there are two single points of failure.
#由於是集群配置,配置之間幾乎差不多,所以配置好一個,直接scp到其余節點就ok了,到了其余節點把關鍵參數修改一下就行了。
具體配置方法如下:
#給zookeeper 創建data 目錄和logs 目錄
mkdir /opt/elk/kafka_2.12-1.0.1/zookeeper/{data,logs} -p
#創建myid 文件
echo 1 > /opt/elk/kafka/zookeeper/data/myid
cd /opt/elk/kafka_2.12-1.0.1
#kafka &ZK 的所有配置文件都放到了config 目錄下
cd /config
#由於kafka 依賴zk 所以啟動的時候需要先配置zk,啟動的時候先把zk啟動了,然後再啟動kafka
vi zookeeper.properties
具體配置內容如下:
#zk 存放數據的目錄,同時因為是集群,zk 需要有一個叫做myid的文件也是放到這個目錄下
dataDir=/opt/elk/kafka/zookeeper/data

指定事務日誌的存儲路徑,可以和dataDir在不同設備,這意味著可以使用一個日誌的專用磁盤,避免日誌IO和快照競爭。

#This option will direct the machine to write the transaction log to the dataLogDir rather than the dataDir. This allows a dedicated log device to be used, and helps avoid competition between logging and snaphots.
dataLogDir=/opt/elk/kafka/zookeeper/logs
#客戶端連接端口
clientPort=2181
#最大客戶端連接數
maxClientCnxns=20
#這個時間是作為Zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔
tickTime=2000
#此配置表示,允許follower(相對於Leaderer言的“客戶端”)連接並同步到Leader的初始化連接時間,以tickTime為單位。當初始化連接時間超過該值,則表示連接失敗。
initLimit=10

此配置項表示Leader與Follower之間發送消息時,請求和應答時間長度。如果follower在設置時間內不能與leader通信,那麽此follower將會被丟棄。

syncLimit=5
#集群模式關鍵配置參數
server.x=[hostname]:nnnnn[:nnnnn]
There are two port numbers nnnnn. The first followers use to connect to the leader, and the second is for leader election. The leader election port is only necessary if electionAlg is 1, 2, or 3 (default). If electionAlg is 0, then the second port is not necessary. If you want to test multiple servers on a single machine, then different ports can be used for each server.
#server.myid=ip:followers_connect to the leader:leader_election # server 是固定的,myid 是需要手動分配,第一個端口是follower是鏈接到leader的端口,第二個是用來選舉leader 用的port

server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.2:2889:3889
#我的第二個server和第三個server 是用的同一臺機器跑了兩個實例,所以端口需要使用不同的端口來配置,切記如果有火墻的話,一定要放行你配置的口

#kafka 的配置同樣在config 目錄中的server.properties 是kafka的配置文件
vi server.properties
#The broker id for this server. If unset, a unique broker id will be generated.To avoid conflicts between zookeeper generated broker id‘s and user configured broker id‘s, generated broker ids start from reserved.broker.max.id + 1.
#每個server需要單獨配置broker id,如果不配置系統會自動配置。
broker.id=0
#消費者的訪問端口,logstash或者elasticsearch
listeners=PLAINTEXT://192.168.115.65:9092
#The number of threads that the server uses for receiving requests from the network and sending responses to the network ,接收和發送網絡信息的線程數
num.network.threads=3
#The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
#The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
socket.send.buffer.bytes=102400
#The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
socket.receive.buffer.bytes=102400
#The maximum number of bytes in a socket request
socket.request.max.bytes=104857600
#這個是設置log的目錄
log.dirs=/usr/local/kafka/logs

The default number of log partitions per topic. More partitions allow greater, parallelism for consumption, but this will also result in more files across the brokers.

num.partitions=1
#The number of threads per data directory to be used for log recovery at startup and flushing at shutdown
num.recovery.threads.per.data.dir=1
#The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
offsets.topic.replication.factor=1
#The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
transaction.state.log.replication.factor=1
#Overridden min.insync.replicas config for the transaction topic.
transaction.state.log.min.isr=1
#The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property。 配置多少小時之後會刪除之前的數據。
log.retention.hours=168
#The maximum size of a single log file。 單個日誌文件的大小
log.segment.bytes=1073741824
#The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion。多少毫秒檢查一次是否有需要刪除的log 文件
log.retention.check.interval.ms=300000
#這塊是重點,配置kafka鏈接的ZK server
zookeeper.connect=192.168.165.65:2181,192.168.101.242:2181,192.168.101.242:2182
#zookeeper 鏈接超時設置
zookeeper.connection.timeout.ms=6000
#The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.
group.initial.rebalance.delay.ms=0
#官方參考文檔地址:http://kafka.apache.org/10/documentation.html#brokerconfigs

192.168.1.1 節點上的所有配置都搞定了

#打包配置好的文件包
tar -zcvf /opt/elk/kafka_2.12-1.0.1-ready.tar.gz kafka_2.12-1.0.1
#scp 源文件到另外一臺機器上
scp /opt/elk/kafka_2.12-1.0.1-ready.tar.gz [email protected]:/opt/elk/

#登錄機器1.2 上配置,這個機器上配置兩套kafka,兩套ZK
FYI:如果你是三臺機器配置的話,就不需要這樣麻煩了,只需要把配置好的安裝包直接分發到不同的機器上,然後修改zookeeper的myid,kafka的broker.id 就可以了。
#解壓拷貝過來的包
tar -zxvf kafka_2.12-1.0.1-ready.tar.gz
#由於這臺機器需要配置兩套kafka和ZK 需要建立對用的數據目錄和log目錄給你不同的實例用
#創建kafka 數據目錄
mkdir /opt/elk/kafka_2.12-1.0.1/data/{k2,k3}
#創建 ZK的數據和log 目錄,官方推薦這兩個目錄最好不在一個磁盤下,可能會影響磁盤寫入讀取性能,所以如果數據量大的話,最好分開
mkdir /opt/elk/kafka_2.12-1.0.1/zookeeper/{z2/{data,logs},z3/{data,logs}} -p
#創建myid 文件並寫入ID number
echo 2 > /opt/elk/kafka_2.12-1.0.1/zookeeper/z2/data/myid
echo 3 > /opt/elk/kafka_2.12-1.0.1/zookeeper/z3/data/myid
#把ZK 和kafka 都是制定配置文件運行的,所以我們需要分別把zookeeper.properties & server.properties 復制為兩個不同的文件名字
cd /opt/elk/kafka_2.12-1.0.1/
cp zookeeper.properties zookeeper-2.properties
mv zookeeper.properties zookeeper-3.properties
cp server.properties server-2.properties
mv server.properties server-3.properties
#所有配置文件搞定了之後,zk-2 &zk-3 需要修改地方如下:
dataDir=剛剛創建好的目錄
dataLogDir=剛剛創建好的目錄
#zk-3 需要多修改一個地方
clientPort=2182

#Kafka 需要修改如下幾個地方:
broker.id=指定的id
#kafka 3 修改port
listeners=PLAINTEXT://192.168.1.2:9093

#到這裏所有kafka集群的所有配置都搞定了,開始啟動集群了,順序是先啟動zk,然後再啟動kafka
#1.1機器上執行如下命令
nohup /opt/elk/kafka_2.12-1.0.1/bin/zookeeper-server-start.sh config/zookeeper.properties >>/dev/null 2>&1 &
#1.2 機器上執行如下命令
nohup bin/zookeeper-server-start.sh config/zookeeper-2.properties >>/dev/null 2>&1 &
nohup bin/zookeeper-server-start.sh config/zookeeper-3.properties >>/dev/null 2>&1 &

#可以通過lsof 命令查看服務是否正常啟動
lsof -i:2181

#介紹幾個簡單的確認zk 服務是否正常的命令
#需要本機安裝nc 命令
yum -y install nc
#使用echo ruok|nc 127.0.0.1 2181 測試是否啟動了該Server,若回復imok表示已經啟動。
技術分享圖片
#查看zk的配置,配置正常返回證明zk service 正常
echo conf | nc 192.168.1.1 2181
技術分享圖片

#stat 可以查看集群狀態
echo stat | nc 127.0.0.1 2182

技術分享圖片
還有如下常用命令:
ZooKeeper 支持某些特定的四字命令字母與其的交互。它們大多是查詢命令,用來獲取 ZooKeeper 服務的當前狀態及相關信息。用戶在客戶端可以通過 telnet 或 nc 向 ZooKeeper 提交相應的命令

  1. 可以通過命令:echo stat|nc 127.0.0.1 2181 來查看哪個節點被選擇作為follower或者leader
  2. 使用echo ruok|nc 127.0.0.1 2181 測試是否啟動了該Server,若回復imok表示已經啟動。
  3. echo dump| nc 127.0.0.1 2181 ,列出未經處理的會話和臨時節點。
  4. echo kill | nc 127.0.0.1 2181 ,關掉server
  5. echo conf | nc 127.0.0.1 2181 ,輸出相關服務配置的詳細信息。
  6. echo cons | nc 127.0.0.1 2181 ,列出所有連接到服務器的客戶端的完全的連接 / 會話的詳細信息。
  7. echo envi |nc 127.0.0.1 2181 ,輸出關於服務環境的詳細信息(區別於 conf 命令)。
  8. echo reqs | nc 127.0.0.1 2181 ,列出未經處理的請求。
  9. echo wchs | nc 127.0.0.1 2181 ,列出服務器 watch 的詳細信息。
  10. echo wchc | nc 127.0.0.1 2181 ,通過 session 列出服務器 watch 的詳細信息,它的輸出是一個與 watch 相關的會話的列表。
  11. echo wchp | nc 127.0.0.1 2181 ,通過路徑列出服務器 watch 的詳細信息。它輸出一個與 session 相

#zk 搞定了之後開始搞kafka 了
1.1 上執行如下命令
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties >>/dev/null 2>&1 &
1.2上執行如下兩條命:
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server-2.properties >>/dev/null 2>&1 &
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server-3.properties >>/dev/null 2>&1 &

#通過lsof 命令查看端口是否正常啟動

#測試kafka 工作是否正常,新建一個topic
/opt/elk/kafka_2.12-1.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#提示以下內容證明沒有問題
Created topic "test".
技術分享圖片
#通過list 來查看
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.1:2181 --list
test
技術分享圖片
#到這裏kafka 集群就搞定了

Centos7上 Nginx + ELK Stack +Kafka + Filebeat 實戰二