1. 程式人生 > >在golang中對rabbitmq進行測試,測試每個連線可建立的通道數量

在golang中對rabbitmq進行測試,測試每個連線可建立的通道數量

rabbitmq環境:3.7.7,三臺個人開發機組成的叢集,使用haproxy代理轉發

進行生產者測試,測試在go中可以連線rabbitmq的協程數。測試了四種情況

第一種:建立amqp連線不關閉,加鎖建立channel,channel不關閉,一共可以建立2048個channel,執行2048次傳送資料到佇列,之後會報”channel id space exhausted“,經多次測試,每個連線只能建立2048個channel

第二種情況:建立amqp連線不關閉,不加鎖建立channel,channel不關閉,測不出可以建立多少channel,但是隻傳送了幾十次資料(不固定,推測是channel建立2048次後就會報錯),就會報"channel id space exhausted"

結論:建立通道不加鎖的情況下,大概只能成功建立幾十個通道,然後就會開始報"channel id space exhausted",傳送到rabbitmq的資料量少,錯誤頻率大,加鎖以後資料反而傳送的多,同時支援2048個通道建立

第三種情況:每次重新連線rabbitmq,加鎖連線rabbitmq和建立channel,只能連線兩千個連線,傳送成功兩千次,之後報"username or password not allowed",經多次測試,大概rabbitmq只支援兩千個連線。

第四種情況:每次重新連線rabbitmq,不加鎖,僅僅連線了幾十次,資料傳送到rabbitmq的數量屈指可數,之後報"socket: too many open files“。

結論:整體執行速度偏慢,連線rabbitmq不加鎖的情況下,錯誤較大(這個是由於發生了錯誤我沒有return,而僅僅只是列印了錯誤),基本傳送不到rabbitmq的佇列,加鎖以後,可同時建立2000個連線,且都能順利傳送到mq的佇列。後來我測了一下測試環境的rabbitmq,大概只能支援七八百個連線,這個連線數量在rabbitmq中應該是可以設定的。

說明:加鎖是為了驗證rabbitmq一共支援建立多少個連線以及每個連線可以建立多少個通道。之前沒有加鎖的時候,發生了不少錯誤,導致自己對結果產生了錯誤估計,實際上是因為建立通道、建立連線發生錯誤沒有return結束該函式的執行,導致出現了不加鎖僅僅只有幾十條訊息投進rabbitmq的佇列。

測試程式碼如下:

package main
 
import (
   "github.com/streadway/amqp"
 "fmt"
 "time"
 "sync"
)
var count int =0
var mutex sync.Mutex
func main() {
   fmt.Println(time.Now())
   conn,err:=amqp.Dial("amqp://chen:[email protected]:8080/")
   if err!=nil {
      fmt.Println(err)
   }
   channel,err:=conn.Channel()
   if err!=nil {
      fmt.Println(err)
   }
   channel.ExchangeDeclare("test",amqp.ExchangeTopic, true, false, false, false, nil)
   channel.QueueDeclare("testQueue", true, false, false, false, nil)
   channel.QueueBind("testQueue","testKey","test",false,nil)
   channel.Close()
   for ;;{
      //go testNotCloseConn()
      go testCloseConn()
      time.Sleep(time.Nanosecond)
   }
   fmt.Println(time.Now())
}
func testNotCloseConn(conn *amqp.Connection){
   mutex.Lock()
   channel,err:=conn.Channel()
   count++
 fmt.Println(count)
   if err!=nil {
      fmt.Println(err)
      panic(err)
   }
   mutex.Unlock()
   amqpTest:=amqp.Publishing{
      ContentType:"application/json",
      Body:[]byte("{name:'chenzonqgi',host:'localhist'}"),
   }
 
   err=channel.Publish("test","testKey",false,false,amqpTest)
   if err!=nil{
      fmt.Println(err)
   }
   defer channel.Close()
   fmt.Println("傳送成功!")
   time.Sleep(time.Second*600)
}
 
func testCloseConn(){
   mutex.Lock()
   conn,err:=amqp.Dial("amqp://chen:[email protected]:8080/")
   count++
 fmt.Println(count)
   if err!=nil {
      fmt.Println("*****",count,"****")
      return
   }
   mutex.Unlock()//加鎖位置
 channel,err:=conn.Channel()
   defer conn.Close()
   defer channel.Close()
   if err!=nil {
      fmt.Println(err)
   }
   amqpTest:=amqp.Publishing{
      ContentType:"application/json",
      Body:[]byte("{name:'chenzonqgi',host:'localhist'}"),
   }
   err=channel.Publish("test","testKey",false,false,amqpTest)
   if err!=nil{
      fmt.Println(err)
   }
   fmt.Println("傳送成功!")
   time.Sleep(time.Second*600)//睡眠協程,不讓conn和channel關閉
}