使用 Kafka 和 MongoDB 進行 Go 非同步處理
在這個示例中,我將資料的儲存和 MongoDB 分離,並建立另一個微服務去處理它。我還添加了 Kafka 為訊息層服務,這樣微服務就可以非同步處理它自己關心的東西了。 下面是這個使用了兩個微服務的簡單的非同步處理示例的上層架構圖。
圖片描述(最多50字)
微服務 1 —— 是一個 REST 式微服務,它從一個 /POST http 呼叫中接收資料。接收到請求之後,它從 http 請求中檢索資料,並將它儲存到 Kafka。儲存之後,它通過 /POST 傳送相同的資料去響應呼叫者。 微服務 2 —— 是一個訂閱了 Kafka 中的一個主題的微服務,微服務 1 的資料儲存在該主題。一旦訊息被微服務消費之後,它接著儲存資料到 MongoDB 中。 我們開始吧! 首先,啟動 Kafka,在你執行 Kafka 伺服器之前,你需要執行 Zookeeper。下面是示例:
$ cd /<download path>/kafka_2.11-1.1.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
接著執行 Kafka —— 我使用 9092 埠連線到 Kafka。如果你需要改變埠,只需要在 config/server.properties 中配置即可。如果你像我一樣是個新手,我建議你現在還是使用預設埠。
$ bin/kafka-server-start.sh config/server.properties
Kafka 跑起來之後,我們需要 MongoDB。它很簡單,只需要使用這個 docker-compose.yml 即可。
version: '3'
services:
mongodb:
image: mongo
ports:
- "27017:27017"
volumes: - "mongodata:/data/db"
networks: - network1
volumes:
mongodata:
networks:
network1:
使用 Docker Compose 去執行 MongoDB docker 容器。
docker-compose up
這裡是微服務 1 的相關程式碼。我只是修改了我前面的示例去儲存到 Kafka 而不是 MongoDB:
rest-to-kafka/rest-kafka-sample.go
func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
//Retrieve body from http request
b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
panic(err)
}
//Save data into Job struct
var _job Job
err = json.Unmarshal(b, &_job)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
saveJobToKafka(_job)
//Convert job struct into json
job)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
//Set content-type http header
w.Header().Set("content-type", "application/json")
//Send back data as response
w.Write(jsonString)
}
func saveJobToKafka(job Job) {
fmt.Println("save to kafka")
jsonString, err := json.Marshal(job)
jobString := string(jsonString)
fmt.Print(jobString)
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
// Produce messages to topic (asynchronously)
topic := "jobs-topic1"
, word := range []string{string(jobString)} {p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
}
這裡是微服務 2 的程式碼。在這個程式碼中最重要的東西是從 Kafka 中消費資料,儲存部分我已經在前面的部落格文章中討論過了。這裡程式碼的重點部分是從 Kafka 中消費資料:
kafka-to-mongo/kafka-mongo-sample.go
func main() {
//Create MongoDB session
session := initialiseMongo()
mongoStore.session = session
receiveFromKafka()
}
func receiveFromKafka() {
fmt.Println("Start receiving from Kafka")
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "group-id-1",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"jobs-topic1"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
job := string(msg.Value)
saveJobToMongo(job)
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
}
func saveJobToMongo(jobString string) {
fmt.Println("Save to MongoDB")
col := mongoStore.session.DB(database).C(collection)
//Save data into Job struct
var _job Job
b := []byte(jobString)
err := json.Unmarshal(b, &_job)
if err != nil {
panic(err)
}
//Insert job into MongoDB
errMongo := col.Insert(_job)
if errMongo != nil {
panic(errMongo)
}
fmt.Printf("Saved to MongoDB : %s", jobString)
}
我們來演示一下,執行微服務 1。確保 Kafka 已經運行了。
圖片描述(最多50字)
這裡是日誌,你可以在微服務 1 中看到。當你看到這些的時候,說明已經接收到了來自 Postman 傳送的資料,並且已經儲存到了 Kafka。
圖片描述(最多50字)
因為我們尚未執行微服務 2,資料被微服務 1 只儲存在了 Kafka。我們來消費它並通過執行的微服務 2 來將它儲存到 MongoDB。
$ go run kafka-mongo-sample.go
現在,你將在微服務 2 上看到消費的資料,並將它儲存到了 MongoDB。
圖片描述(最多50字)
檢查一下資料是否儲存到了 MongoDB。如果有資料,我們成功了!
圖片描述(最多50字)
歡迎工作一到五年的Java工程師朋友們加入Java架構開發: 855835163
群內提供免費的Java架構學習資料(裡面有高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!