1. 程式人生 > >kafka安裝配置及與logstash整合

kafka安裝配置及與logstash整合

1、kafka安裝 下載   wget http://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz 配置zookeeper     vim bin/zookeeper-server-start.sh   根據機器狀況更改jvm 記憶體設定 配置kafka             vim bin/kafka-server-start.sh            根據機器狀況更改jvm 記憶體設定 啟動zookeeper       nohup bin/zookeeper-server-start.sh config/zookeeper.properties & 啟動kafka               nohup bin/kafka-server-start.sh config/server.properties & 建立topic               bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  pay 檢視topic               bin/kafka-topics.sh --list --zookeeper localhost:2181 啟動producer         bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pay 啟動consumer       bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic pay --from-beginning 多broker叢集配置: cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties config/server-1.properties:     broker.id=1     listeners=PLAINTEXT://:9093     log.dir=/tmp/kafka-logs-1 config/server-2.properties:     broker.id=2     listeners=PLAINTEXT://:9094     log.dir=/tmp/kafka-logs-2 啟動kafka       nohup bin/kafka-server-start.sh config/server-1.properties &                        nohup bin/kafka-server-start.sh config/server-2.properties & 建立--replication-factor 3的topic  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic pay-replicated 檢視topic描述: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic pay-replicated
leader:負責所有的讀和寫的節點id(broker.id) replicas:複製log的節點列表,不管他們是否是leader或是否alive isr:in-sync的replicas子集合,是目前存活且被leader捕獲的節點集。 停止當前leader(broker.id=1)後:
啟動producer         bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pay-replicated 啟動consumer       bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic pay-replicated --from-beginning 注意,multi-broker模式下,當只剩下一個broker時會有問題 2、logstash安裝 安裝  yum localinstall logstash-2.2.2-1.noarch.rpm 測試 /opt/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }' 配置檔案目錄 /etc/logstash/conf.d/xxx.conf 3、kafka+logstash整合 logstash1.5以後已經集成了對kafka的支援擴充套件,可以在conf配置中直接使用 vim /etc/logstash/conf.d/pay.conf input {         kafka{                 zk_connect => "your zookeeper address:2181"                 group_id => "logstash"                 topic_id => "pay-replicated"                 reset_beginning => false                 consumer_threads => 5                 decorate_events => true         } } output { #    stdout{ codec=> rubydebug }     redis {         host => ["your redis address:6380"]         batch => true         key => "logstash-nginx-pay-replicated"         data_type => "list"     } } 重啟logstash  service logstash restart