Golang封裝RabbitMQ
摘要:
程式封裝
package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
// 定義全域性變數,指標型別
var mqConn *amqp....
程式封裝
package rabbitmq import ( "fmt" "github.com/streadway/amqp" "time" ) // 定義全域性變數,指標型別 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定義生產者介面 type Producer interface { MsgContent() string } // 定義接收者介面 type Receiver interface { Consumer([]byte)error } // 定義RabbitMQ物件 type RabbitMQ struct { connection *amqp.Connection channel *amqp.Channel queueNamestring// 佇列名稱 routingKeystring// key名稱 exchangeName string// 交換機名稱 exchangeType string// 交換機型別 producerList []Producer receiverList []Receiver } // 定義佇列交換機物件 type QueueExchange struct { QuNamestring// 佇列名稱 RtKeystring// key值 ExNamestring// 交換機名稱 ExTypestring// 交換機型別 } // 連結rabbitMQ func (r *RabbitMQ)mqConnect() { var err error RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", "guest", "guest", "******", 5673) mqConn, err = amqp.Dial(RabbitUrl) r.connection = mqConn// 賦值給RabbitMQ物件 if err != nil { fmt.Printf("MQ開啟連結失敗:%s \n", err) } mqChan, err = mqConn.Channel() r.channel = mqChan// 賦值給RabbitMQ物件 if err != nil { fmt.Printf("MQ開啟管道失敗:%s \n", err) } } // 關閉RabbitMQ連線 func (r *RabbitMQ)mqClose() { // 先關閉管道,再關閉連結 err := r.channel.Close() if err != nil { fmt.Printf("MQ管道關閉失敗:%s \n", err) } err = r.connection.Close() if err != nil { fmt.Printf("MQ連結關閉失敗:%s \n", err) } } // 建立一個新的操作物件 func New(q *QueueExchange) *RabbitMQ { return &RabbitMQ{ queueName:q.QuName, routingKey:q.RtKey, exchangeName: q.ExName, exchangeType: q.ExType, } } // 啟動RabbitMQ客戶端,並初始化 func (r *RabbitMQ) Start() { // 開啟監聽生產者傳送任務 for _, producer := range r.producerList { go r.listenProducer(producer) } // 開啟監聽接收者接收任務 for _, receiver := range r.receiverList { go r.listenReceiver(receiver) } time.Sleep(1*time.Second) } // 註冊傳送指定佇列指定路由的生產者 func (r *RabbitMQ) RegisterProducer(producer Producer) { r.producerList = append(r.producerList, producer) } // 傳送任務 func (r *RabbitMQ) listenProducer(producer Producer) { // 驗證連結是否正常,否則重新連結 if r.channel == nil { r.mqConnect() } // 用於檢查佇列是否存在,已經存在不需要重複宣告 _, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil) if err != nil{ // 佇列不存在,宣告佇列 // name:佇列名稱;durable:是否持久化,佇列存檔,true服務重啟後資訊不會丟失,影響效能;autoDelete:是否自動刪除;noWait:是否非阻塞, // true為是,不等待RMQ返回資訊;args:引數,傳nil即可;exclusive:是否設定排他 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil) if err != nil { fmt.Printf("MQ註冊佇列失敗:%s \n", err) return } } // 佇列繫結 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil) if err != nil { fmt.Printf("MQ繫結佇列失敗:%s \n", err) return } // 用於檢查交換機是否存在,已經存在不需要重複宣告 err = r.channel.ExchangeDeclarePassive(r.exchangeName, r.exchangeType, true, false, false, true, nil) if err != nil{ // 註冊交換機 // name:交換機名稱,kind:交換機型別,durable:是否持久化,佇列存檔,true服務重啟後資訊不會丟失,影響效能;autoDelete:是否自動刪除; // noWait:是否非阻塞, true為是,不等待RMQ返回資訊;args:引數,傳nil即可; internal:是否為內部 err =r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, true, nil) if err != nil { fmt.Printf("MQ註冊交換機失敗:%s \n", err) return } } // 傳送任務訊息 err =r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body:[]byte(producer.MsgContent()), }) if err != nil { fmt.Printf("MQ任務傳送失敗:%s \n", err) return } } // 註冊接收指定佇列指定路由的資料接收者 func (r *RabbitMQ) RegisterReceiver(receiver Receiver) { r.receiverList = append(r.receiverList, receiver) } // 監聽接收者接收任務 func (r *RabbitMQ) listenReceiver(receiver Receiver) { // 處理結束關閉連結 defer r.mqClose() // 驗證連結是否正常 if r.channel == nil { r.mqConnect() } // 用於檢查佇列是否存在,已經存在不需要重複宣告 _, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil) if err != nil{ // 佇列不存在,宣告佇列 // name:佇列名稱;durable:是否持久化,佇列存檔,true服務重啟後資訊不會丟失,影響效能;autoDelete:是否自動刪除;noWait:是否非阻塞, // true為是,不等待RMQ返回資訊;args:引數,傳nil即可;exclusive:是否設定排他 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil) if err != nil { fmt.Printf("MQ註冊佇列失敗:%s \n", err) return } } // 繫結任務 err =r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true, nil) if err != nil { fmt.Printf("繫結佇列失敗:%s \n", err) return } // 獲取消費通道,確保rabbitMQ一個一個傳送訊息 err =r.channel.Qos(1, 0, true) msgList, err :=r.channel.Consume(r.queueName, "", false, false, false, false, nil) if err != nil { fmt.Printf("獲取消費通道異常:%s \n", err) return } for msg := range msgList { // 處理資料 err := receiver.Consumer(msg.Body) if err!=nil { err = msg.Ack(true) if err != nil { fmt.Printf("確認訊息未完成異常:%s \n", err) return } }else { // 確認訊息,必須為false err = msg.Ack(false) if err != nil { fmt.Printf("確認訊息完成異常:%s \n", err) return } return } } }
使用方法
package main import ( "fmt" "test/rabbitmq" ) type TestPro struct { msgContentstring } // 實現傳送者 func (t *TestPro) MsgContent() string { return t.msgContent } // 實現接收者 func (t *TestPro) Consumer(dataByte []byte) error { fmt.Println(string(dataByte)) return nil } func main() { msg := fmt.Sprintf("這是測試任務") t := &TestPro{ msg, } queueExchange := &rabbitmq.QueueExchange{ "test.rabbit", "rabbit.key", "test.rabbit.mq", "direct", } mq := rabbitmq.New(queueExchange) mq.RegisterProducer(t) mq.RegisterReceiver(t) mq.Start() }