1. 程式人生 > >單機配置kafka和zookeeper

單機配置kafka和zookeeper

events style loser fluent RM topic ssa url zookeepe

1:環境準備

jdk 推薦oracle,不建議open sdk

在/etc/profile加入下列環境變量

技術分享圖片

在PATH中將jdk和jre的bin加入path裏面

$JAVA_HOME/bin:$JRE_HOME/bin

2:安裝zookeeper

下載zookeeper tar

url: https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz

將壓縮包移動到/usr/local下面

tar -zxvf ***

更改配置文件

(1)將conf/zoo_sample.cfg更改為zoo.cfg

(2)更改配置如下

技術分享圖片

註意:

  *其中默認port為2181。

  *datadir需手動創建 mkdir -p datadir

*註釋掉的參數在單機中無用

(3)加入環境變量 /etc/profile

技術分享圖片

測試:

可以cd 到bin目錄下通過執行 zkServer.sh shell腳本啟動、停止或者查看狀態

eg:

./zkServer.sh start/stop/status

3:安裝kafka

先下載tar包、解壓、mv到/usr/local下面

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
sudo mv *** /usr/local
cd 
/usr/local sudo tar -zxvf ****
sudo rm **.tar.gz

修改config目錄下配置文件

vim server.properties

修改如下參數

#broker.id需改成正整數,單機為1就好
broker.id=1
#指定端口號
port=9092
#localhost這一項還有其他要修改,詳細見下面說明
host.name=localhost
#指定kafka的日誌目錄
log.dirs=/usr/local/kafka_2.11-0.11.0.0/kafka-logs
#連接zookeeper配置項,這裏指定的是單機,所以只需要配置localhost,若是實際生產環境,需要在這裏添加其他ip地址和端口號
zookeeper.connect
=localhost:2181

vim zookeeper.properties

修改如下參數

#數據目錄
dataDir=/usr/local/kafka_2.11-0.11.0.0/zookeeper/data
#客戶端端口
clientPort=2181
host.name=localhost

producer.properties and consumer.properties

zookeeper.connect=localhost:2181

4:啟動kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

技術分享圖片

終端都是info log

註意:如果是非root用戶,可能會使用到sudo去啟動zookeeper和kafka,但是那樣會失敗必須將kafka整個目錄下的文件的owner和group都改為自己的用戶。

可以通過jps命令查看兩者是否啟動成功

技術分享圖片

2576 QuorumPeerMain表示zookeeper

如果發現沒啟動成功,可以在zookeeper/config/zookeeper.out裏debug。

成功啟動後,可以簡單通過demo進行測試

github有各種語言的kafka支持

url:https://github.com/edenhill/librdkafka/tree/v0.9.5

技術分享圖片

可以簡單通過go的demo進行測試

consumer

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
            break
        }
    }

    c.Close()
}

producer

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

  
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

 
    topic := "myTopic"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }


    p.Flush(15 * 1000)
}

單機配置kafka和zookeeper