1. 程式人生 > >Rabbitmq詳解(基於go語言)

Rabbitmq詳解(基於go語言)

參考文件

RMQ的安裝和埠

手動安裝太麻煩,請自行百度。這裡只給出一種基於docker安裝的簡單形式。

docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=使用者名稱 -e RABBITMQ_DEFAULT_PASS=密碼 rabbitmq:3-management

通過命令可以看出,一共映射了三個埠,簡單說下這三個埠是幹什麼的。
5672:連線生產者、消費者的埠。
15672:WEB管理頁面的埠。
25672:分散式叢集的埠。

基礎概念

amqp:一種訊息中介軟體協議,RMQ是amqp協議的一個具體實現。RMQ使用Erlang語言實現的,具有很好的併發能力,具體歷史請百度,這裡主要關心怎麼用。
RMQ工作原理圖
注:此圖不一定是最標準的,但比較形象,易於理解。
下面是圖中出現的單詞的詳細解釋:
Producer:生產者,負責生產訊息。
Connect:連線,生產者與RMQ Server之間建立的TCP連線。
Channel:通道,一條連線可包含多條通道,不同通道之間通訊互不干擾。考慮下多執行緒應用場景,每個執行緒對應一條通道,而不是對應一條連線,這樣可以提高效能。
body:訊息主體,要傳遞的資料。
exchange:交換器,負責把訊息轉發到對應的佇列。交換器本身沒有快取訊息的功能,訊息是在佇列中快取的,如果佇列不存在,則交換器會直接丟棄訊息。常用的有四種類型的交換器:direct、fanout、topic、headers。不同型別的交換器有不同的交換規則,交換器會根據交換規則把訊息轉發到對應的佇列。
exchangeName:交換器名稱,每個交換器對應一個名稱,傳送訊息時會附帶交換器名稱,根據交換器名稱選擇對應的交換器。
queue:佇列,用於快取訊息。
BandingKey:繫結鍵,一個佇列可以有一個到多個繫結鍵,通過繫結操作可以繫結交換器和佇列,交換器會根據繫結鍵的名稱找到對應的佇列。
RotingKey:路由鍵,傳送訊息時,需要附帶一條路由鍵,交換器會對路由鍵和繫結鍵進行匹配,如果匹配成功,則訊息會轉發到繫結鍵對應的佇列中。
Consumer:消費者,負責處理訊息。

交換器型別

注:這裡只關注fanout、direct、topic三種類型,header型別沒用過,不關注。

  • fanout – 扇出型
    扇出交換器
    用於支援釋出、訂閱模式(pub/sub)
    交換器把訊息轉發到所有與之繫結的佇列中。
    扇出型別交換器會遮蔽掉路由鍵、繫結鍵的作用。
  • direct – 直接匹配
    直接匹配交換器
    用於支援路由模式(Routing)
    直接匹配交換器會對比路由鍵和繫結鍵,如果路由鍵和繫結鍵完全相同,則把訊息轉發到繫結鍵所對應的佇列中。

  • topic – 模式匹配
    模式匹配交換器
    與直接匹配相對應,可以用一些模式來代替字串的完全匹配。
    規則:
    以 ‘.’ 來分割單詞。
    ‘#’ 表示一個或多個單詞。
    ‘*’ 表示一個單詞。
    如:
    RoutingKey為:
    aaa.bbb.ccc
    BindingKey可以為:
    *.bbb.ccc
    aaa.#

預設交換器

預設交換器
RMQ會自帶幾個交換器,簡單看下,這裡只介紹AMQP default。
當交換器名稱為空時,表示使用預設交換器。空的意思是空字串。
預設交換器是一個特殊的交換器,他無需進行繫結操作,可以以直接匹配的形式直接把訊息傳送到任何佇列中。
下圖兩種模式均採用了預設交換器:
點到點
點到多點
兩個消費者從一個佇列取資料時,會產生競爭條件。此時訊息只能給其中的一個消費者。如果兩個消費者均沒有在收到訊息後做應答操作,則訊息會平均傳送給兩個消費者。如果收到訊息後做了應答操作,則會採取能者多勞的模式。

建立交換器

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

name:交換器的名稱,對應圖中exchangeName。
kind:也叫作type,表示交換器的型別。有四種常用型別:direct、fanout、topic、headers。
durable:是否持久化,true表示是。持久化表示會把交換器的配置存檔,當RMQ Server重啟後,會自動載入交換器。
autoDelete:是否自動刪除,true表示是。至少有一條繫結才可以觸發自動刪除,當所有繫結都與交換器解綁後,會自動刪除此交換器。
internal:是否為內部,true表示是。客戶端無法直接傳送msg到內部交換器,只有交換器可以傳送msg到內部交換器。
noWait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。

建立佇列

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

name:佇列名稱
durable:是否持久化,true為是。持久化會把佇列存檔,伺服器重啟後,不會丟失佇列以及佇列內的資訊。(注:1、不丟失是相對的,如果宕機時有訊息沒來得及存檔,還是會丟失的。2、存檔影響效能。)
autoDelete:是否自動刪除,true為是。至少有一個消費者連線到佇列時才可以觸發。當所有消費者都斷開時,佇列會自動刪除。
exclusive:是否設定排他,true為是。如果設定為排他,則佇列僅對首次宣告他的連線可見,並在連線斷開時自動刪除。(注意,這裡說的是連線不是通道,相同連線不同通道是可見的)。
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。

佇列繫結

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error

name:佇列名稱
key:對應圖中BandingKey,表示要繫結的鍵。
exchange:交換器名稱
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。

交換器繫結

func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error

源交換器根據路由鍵&繫結鍵把msg轉發到目的交換器。
destination:目的交換器,通常是內部交換器。
key:對應圖中BandingKey,表示要繫結的鍵。
source:源交換器。
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。

傳送訊息

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error

exchange:要傳送到的交換機名稱,對應圖中exchangeName。
key:路由鍵,對應圖中RoutingKey。
mandatory:直接false,不建議使用,後面有專門章節講解。
immediate :直接false,不建議使用,後面有專門章節講解。
msg:要傳送的訊息,msg對應一個Publishing結構,Publishing結構裡面有很多引數,這裡只強調幾個引數,其他引數暫時列出,但不解釋。

# cat $(find ./amqp) | grep -rin type.*Publishing
type Publishing struct {
        Headers Table
        // Properties
        ContentType     string //訊息的型別,通常為“text/plain”
        ContentEncoding string //訊息的編碼,一般預設不用寫
        DeliveryMode    uint8  //訊息是否持久化,2表示持久化,0或1表示非持久化。
        Body []byte //訊息主體
        Priority        uint8 //訊息的優先順序 0 to 9
        CorrelationId   string    // correlation identifier
        ReplyTo         string    // address to to reply to (ex: RPC)
        Expiration      string    // message expiration spec
        MessageId       string    // message identifier
        Timestamp       time.Time // message timestamp
        Type            string    // message type name
        UserId          string    // creating user id - ex: "guest"
        AppId           string    // creating application id
}

接受訊息 – 推模式

RMQ Server主動把訊息推給消費者

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

queue:佇列名稱。
consumer:消費者標籤,用於區分不同的消費者。
autoAck:是否自動回覆ACK,true為是,回覆ACK表示高速伺服器我收到訊息了。建議為false,手動回覆,這樣可控性強。
exclusive:設定是否排他,排他表示當前佇列只能給一個消費者使用。
noLocal:如果為true,表示生產者和消費者不能是同一個connect。
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。
注意下返回值:返回一個<- chan Delivery型別,遍歷返回值,有訊息則往下走, 沒有則阻塞。

接受訊息 – 拉模式

消費者主動從RMQ Server拉訊息

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)

queue:佇列名稱
autoAck:是否開啟自動回覆。

手動回覆訊息

func (ch *Channel) Ack(tag uint64, multiple bool) error
func (me Delivery) Ack(multiple bool) error {
        if me.Acknowledger == nil {
                return errDeliveryNotInitialized
        }
        return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}
func (d Delivery) Reject(requeue bool) error

簡單看一眼,函式2呼叫了函式1,本質上兩個函式沒區別。
這裡推薦使用第二個,因為方便。
另外說一下multiple引數。true表示回覆當前通道所有未回覆的ack,用於批量確認。false表示回覆當前條目。
函式三:
拒絕本條訊息。如果requeue為true,則RMQ會把這條訊息重新加入佇列,如果requeue為false,則RMQ會丟棄本條訊息。
注:推薦手動回覆,儘量不要使用autoACK,因autoACK不可控。

關閉連線

func (ch *Channel) Close() error
func (c *Connection) Close() error

簡單看下,不解釋了,按照流程最好有關閉一下,其實不關閉也沒啥事。。

Publish – mandatory引數

false:當訊息無法通過交換器匹配到佇列時,會丟棄訊息。
true:當訊息無法通過交換器匹配到佇列時,會呼叫basic.return通知生產者。
注:不建議使用,因會使程式邏輯變得複雜,可以通過備用交換機來實現類似的功能。

Publish – immediate引數

true:當訊息到達Queue後,發現佇列上無消費者時,通過basic.Return返回給生產者。
false:訊息一直快取在佇列中,等待生產者。
注:不建議使用此引數,遇到這種情況,可用TTL和DLX方法代替(後面會介紹)。

備用交換機&TTL+DLX

訊息傳送流程
藉助備用交換機、TTL+DLX代替mandatory、immediate方案:
1、P傳送msg給Ex,Ex無法把msg路由到Q,則會把路由轉發給ErrEx。
2、msg暫存在Q上之後,如果C不能及時消費msg,則msg會轉發到DlxEx。
3、TTL為msg在Q上的暫存時間,單位為毫秒。

  • 通過設定引數,可以設定Ex的備用交換器ErrEx
    建立Exchange時,指定Ex的Args – “alternate-exchange”:”ErrEx”。
    其中ErrEx為備用交換器名稱

  • 通過設定引數,可以設定Q的DLX交換機DlxEX
    建立Queue時,指定Q的Args引數:
    “x-message-ttl”:0 //msg超時時間,單位毫秒
    “x-dead-letter-exchange”:”dlxExchange” //DlxEx名稱
    “x-dead-letter-routing-key”:”dlxQueue” //DlxEx路由鍵

持久化

持久化的作用是防止在重啟、關閉、宕機的情況下資料丟失,持久化是相對靠譜的,如果資料在記憶體中,沒來得及存檔就發生了重啟,那麼資料還是會丟失。
持久化可分為三類:
1、Exchange的持久化(建立時設定引數)
2、Queue的持久化(建立時設定引數)
3、Msg的持久化(傳送時設定Args)

Qos

func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

注意:這個在推送模式下非常重要,通過設定Qos用來防止訊息堆積。
prefetchCount:消費者未確認訊息的個數。
prefetchSize :消費者未確認訊息的大小。
global :是否全域性生效,true表示是。全域性生效指的是針對當前connect裡的所有channel都生效。

封裝思路

簡單說下,封裝是因為這東西引數太多了,不夠好用。所以又包了一層,讓程式碼看起來更簡潔一些,思路上參考了spring-rmq庫,採用json或者xml配置檔案來描述rmq中的各個元件。這樣看起來會舒服些。

封裝的參考程式碼:
https://gitee.com/vrg0/go-rabbitmq.git
注:喵自己封裝的,沒上過生產環境,功能上也不算特別全,好不好用也有待考驗,姑且可當做一個參考吧。。。