Golang RabbitMQ 自動重連/重發生產者
背景
Golang裡面採用ofollow,noindex">AMQP
來連線rabbitmq
, 使用之後發現這個庫比較底層,只提供協議的封裝。這個庫用到生產環境不合適,包裝了一層以提供更加穩定的功能,程式碼地址
目標
- 斷線重連: 這個需求不過分,生產環境各種網路問題,最起碼的重連要支援,支援重連次數
- 訊息重發: 這個要求不過分,Rabbitmq有訊息確認機制
關鍵實現
PS:Golang的併發真的設計的很好,習慣之後用起來比多執行緒/鎖的模式舒服一些。
- 定義三個通道來進行併發
type Producer struct { namestring logger*log.Logger connection*amqp.Connection channel*amqp.Channel donechan bool// 如果主動close,會接受資料 notifyClosechan *amqp.Error// 如果異常關閉,會接受資料 notifyConfirm chan amqp.Confirmation // 訊息傳送成功確認,會接受到資料 isConnectedbool }
- 註冊監聽
producer.channel.NotifyClose(producer.notifyClose) producer.channel.NotifyPublish(producer.notifyConfirm)
-
發了就不管
直接push訊息,回傳一個error
return producer.channel.Publish( "",// Exchange producer.name, // Routing key false,// Mandatory false,// Immediate amqp.Publishing{ DeliveryMode: 2, ContentType:"application/json", Body:data, Timestamp:time.Now(), }, )
-
三次重傳的發訊息
這裡主要通過
time.NewTicker
來實現超時重發
ticker := time.NewTicker(resendDelay) select { case confirm := <-producer.notifyConfirm: if confirm.Ack { producer.logger.Println("Push confirmed!") return nil } case <- ticker.C: }