1. 程式人生 > >Golang之發送消息至kafka

Golang之發送消息至kafka

onf git 輸出 打開鏈接 part 結果 target 命名 second

windows下安裝zookeeper

1、安裝JAVA-JDK,從oracle下載最新的SDK安裝(我用的是1.8的)
2、安裝zookeeper3.3.6,下載地址:http://apache.fayea.com/zookeeper/
3、重命名conf/zoo_sample.cfg 為conf/zoo.cfg
4、編輯 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\
4、運行bin/zkServer.cmd

啟動結果如下:

技術分享圖片

安裝kafka

1、打開鏈接:http://kafka.apache.org/downloads.html下載kafka2.1.2

2、打開config目錄下的server.properties, 修改log.dirs為D:\kafka_logs,

3、修改advertised.host.name=服務器ip

4、啟動kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties

kafka鏈接zookeeper

kafka也提供了一個命令行消費者,接受消息並打印到標準輸出。

bin/kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic nginx_log

golang寫入kafka


package main

import (
"fmt"

"github.com/Shopify/sarama"
"time"
)

//消息寫入kafka
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//生產者
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close,err:", err)
return
}

defer client.Close()
var n int=0

for n<20{
n++
//創建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test,hello chaoge!!")
//發送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n,", pid, offset)
time.Sleep(10 * time.Millisecond)

}

}
 

goland運行結果:

技術分享圖片

kafka收到的數據:

技術分享圖片

Golang之發送消息至kafka