kafka安裝配置及與logstash整合
阿新 • • 發佈:2018-12-31
1、kafka安裝
下載 wget http://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz
配置zookeeper vim bin/zookeeper-server-start.sh 根據機器狀況更改jvm 記憶體設定
配置kafka vim bin/kafka-server-start.sh 根據機器狀況更改jvm 記憶體設定
啟動zookeeper nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動kafka nohup bin/kafka-server-start.sh config/server.properties &
建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pay
檢視topic bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pay
啟動consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic pay --from-beginning
多broker叢集配置:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
啟動kafka nohup bin/kafka-server-start.sh config/server-1.properties &
nohup bin/kafka-server-start.sh config/server-2.properties &
建立--replication-factor 3的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic pay-replicated
檢視topic描述:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic pay-replicated
leader:負責所有的讀和寫的節點id(broker.id) replicas:複製log的節點列表,不管他們是否是leader或是否alive isr:in-sync的replicas子集合,是目前存活且被leader捕獲的節點集。 停止當前leader(broker.id=1)後:
啟動producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pay-replicated 啟動consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic pay-replicated --from-beginning 注意,multi-broker模式下,當只剩下一個broker時會有問題 2、logstash安裝 安裝 yum localinstall logstash-2.2.2-1.noarch.rpm 測試 /opt/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }' 配置檔案目錄 /etc/logstash/conf.d/xxx.conf 3、kafka+logstash整合 logstash1.5以後已經集成了對kafka的支援擴充套件,可以在conf配置中直接使用 vim /etc/logstash/conf.d/pay.conf input { kafka{ zk_connect => "your zookeeper address:2181" group_id => "logstash" topic_id => "pay-replicated" reset_beginning => false consumer_threads => 5 decorate_events => true } } output { # stdout{ codec=> rubydebug } redis { host => ["your redis address:6380"] batch => true key => "logstash-nginx-pay-replicated" data_type => "list" } } 重啟logstash service logstash restart
leader:負責所有的讀和寫的節點id(broker.id) replicas:複製log的節點列表,不管他們是否是leader或是否alive isr:in-sync的replicas子集合,是目前存活且被leader捕獲的節點集。 停止當前leader(broker.id=1)後:
啟動producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pay-replicated 啟動consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic pay-replicated --from-beginning 注意,multi-broker模式下,當只剩下一個broker時會有問題 2、logstash安裝 安裝 yum localinstall logstash-2.2.2-1.noarch.rpm 測試 /opt/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }' 配置檔案目錄 /etc/logstash/conf.d/xxx.conf 3、kafka+logstash整合 logstash1.5以後已經集成了對kafka的支援擴充套件,可以在conf配置中直接使用 vim /etc/logstash/conf.d/pay.conf input { kafka{ zk_connect => "your zookeeper address:2181" group_id => "logstash" topic_id => "pay-replicated" reset_beginning => false consumer_threads => 5 decorate_events => true } } output { # stdout{ codec=> rubydebug } redis { host => ["your redis address:6380"] batch => true key => "logstash-nginx-pay-replicated" data_type => "list" } } 重啟logstash service logstash restart