1. 程式人生 > >訊息佇列,RabbitMQ、Kafka、RocketMQ

訊息佇列,RabbitMQ、Kafka、RocketMQ

[TOC] # 1、訊息列隊概述 ## 1.1訊息佇列MQ ``` MQ全稱為Messages Queue ,訊息佇列是應用程式和應用程式之間的通訊方法 為什麼使用MQ? 在專案中,可將一些無需耗時的操作提取出來,進行非同步操作,而這種非同步處理方式大大的姐生了伺服器的請求響應時間,從而提高了系統的吞吐量。 ``` 開發中訊息列隊通常有以下應用場景 ``` 1.任務非同步處理 將不需要同步處理的斌且好使長的操作由訊息佇列通知訊息接收方進行非同步處理。提高了應用的響應時間 2.應用程式解耦合 MQ相當於一箇中介,生產放通過MQ與消費方互動,它將應用程式進行解耦合 ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200506190926.png) ## 1.2AMQP和JMS MQ是訊息通訊模型;實現MQ的大致有兩種主流方式:AMQP、JMS ### 1.2.1AMQP 六種訊息模式 AMQP是高階訊息佇列協議,是一個程序間傳遞非同步訊息的網路協議,更準確的說是一種binary wire-level protocol(連結協議)。這是和JMS的本質區別,AMQP不從API層進行限定,而是直接定義網路交換的資料格式 ### 1.2.2JMS 兩種訊息模式 JMS即JAVA訊息服務應用程式介面,是一個java平臺中關於面向訊息中介軟體的API,用於在兩個應用程式之間,或分佈系統中傳送訊息,進行非同步通訊 ### 1.2.3AMOP 與 JMS 區別 ``` · JMS 是定義了統的介面,來對訊息攝作進行統一,AMQP是通過規定協議來統一統一資料互動的格式 · JMS 限定了必須使用 Java 語宮: AMQP只是協議,不規定實現方式.因此是跨語宮的. · JMS 規定了兩種訊息模式(點對點模式,訂閱模式):而 AMQP的訊息模式更加豐富 ``` ## 1.3訊息佇列產品 市場上常見的息佇列有如下: 目前市面上成熟主流的MQ有Kafka、RocketMQ、RabbitMQ,我們這裡對每款款MQ做一個簡介紹。 ### 1.3.1 Kafka ``` 所有開源的MQ,吞吐量最強的,天然支援叢集,訊息堆積能力非常強悍 Apache下的一個子專案,使用scala語言實現的一個高效能分散式Publish/Subscribe訊息佇列系統 1.快速持久化:通過磁碟順序讀寫與零拷貝機制,可以在0(1)的系統開銷下進行訊息持久化 2.高吞吐:在一臺普通的伺服器上即可以達到10W/s的吞吐速率。 3.高堆積:支援topoc下消費者較長時間離線,訊息堆積量大 4.完全的分散式系統:Brocker、Producer、Consumer都原生自動支援分散式,依賴zookeeper自動實現複雜均衡 5.支援Hadoop資料並行載入:對於像Hadoop的一樣的日誌資料和離線分系統,但又要求實時處理的限制,這是一個可行的解決方案 ``` ### 1.3.2 RocketMQ ``` RocketMQ國產阿里的,經過雙十一的檢驗。也非常強悍,基於java語言寫的 RocketMQ的前身是Metaq,當Metaq3.0釋出時,產品名稱改為RocketMQ.RocketMQ是一款分散式、佇列模型的訊息中介軟體 1.能夠保證嚴格的訊息順序 2.提供豐富的訊息拉取模式 3.高效的訂閱水平擴充套件能力 4.實時的訊息訂閱機制 5.支援事務訊息 6.億級訊息堆積能力 ``` ### 1.3.3RabbitMQ ``` 使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP,SMTP,STOMP,正是如此使它變得非常重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將訊息直接傳送給佇列,訊息在傳送給客戶端時先在中心佇列排隊。對路由,負載均衡、資料持久化都有很好的支援。多用於進行企業級的ESB整合。 ``` ## 1.4RabbitMQ ``` RabbitMQ是erlang語言開發,基於AMQP(Advanced Message Queue高階訊息佇列協議)協議實現的訊息佇列,它是一種應用程式之間的湧信方法,訊息佇列在分散式系統開發中應用非常廣乏 RabbitMQ官方地址:http://www.rabbitmq.com/ RabbitMQ提供了6種模式: 1.簡單模式 2.work工作模式 3.publish/Subscribe釋出與訂閱模式 4.Routing路由模式 5.Topics主題模式 6.RPC遠呼叫模式(遠端呼叫,不太算MQ;不作介紹) ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200506214047.png) ### 簡單模式 ``` 一個生產者,一個消費者 ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200506220630.png) ### 工作模式 ``` 一個生產者、2個消費者。 一個訊息只能被一個消費者獲取。 ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200506221233.png) ### 訂閱模式 ``` 解讀: 1、1個生產者,多個消費者 2、每一個消費者都有自己的一個佇列 3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機。交換機分別都發送給佇列 4、每個佇列都要繫結到交換機 5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的 注意:一個消費者佇列可以有多個消費者例項,只有其中一個消費者例項會消費 交換機(圖中X):只負責轉發訊息,不具備儲備訊息的能力。一方面接收生產者P的訊息,另一方面處理訊息(傳送給佇列,或者特定佇列,或者丟棄),取決於交換機的類別 交換機的類別: Fanout:廣播,將訊息交給所有繫結到交換機的佇列 Direct:定向,把訊息交給符合指定的佇列中 Topic:萬用字元,把訊息交給符合routing pattern(路由模式)的佇列 ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200507100435.png) ### 路由模式 ``` 1.佇列與交換機的繫結,不能是任意綁定了,而是要指定一個routingkey(路由key) 2.訊息的傳送方在 向 Exchange(交換機圖中X)傳送訊息時,也必須指定訊息的RoutingKey 3.交換機不再把訊息交給每一個繫結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的Routing與訊息的Routing Key 完全一致,才會接收到訊息 ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200506223234.png) ``` 圖解: P:生產者 X:交換機 C1:消費者,指定所在佇列需要routing key為error的訊息 C2:消費者,指定所在佇列需要routing key為info,erroe,warning的訊息 ``` ### 萬用字元模式(主題模式) ``` 可以根據Routingkey把訊息路由到不同的佇列。只不過萬用字元型別交換機可以讓佇列在繫結routing key的時候使用萬用字元 routingkey一般都是由一個或多個單片語成,多個單詞之間以‘.’點分割,例如:item.add.hello 萬用字元規則: #:匹配一個或多個詞 *:匹配一個詞 舉例: item.#:能夠匹配item.index.add或者item.index itrm.*:只能匹配item.index或者item.xxx ``` ![](https://gitee.com/guyouyin/image/raw/master/img/20200506224056.png) # 2.安裝及配置RabbitMQ ## 使用者角色 ``` 1、超級管理員(administrator) 可登陸管理控制檯,可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。 2、監控者(monitoring) 可登陸管理控制檯,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁碟使用情況等) 3、策略制定者(policymaker) 可登陸管理控制檯, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框標識的部分)。 4、普通管理者(management) 僅可登陸管理控制檯,無法看到節點資訊,也無法對策略進行管理。 5、其他 無法登陸管理控制檯,通常就是普通的生產者和消費者。 ``` # 3.python使用RabbitMQ ## 輪詢消費模式 此模式下,傳送佇列的一方把訊息存入mq的指定佇列後,若有消費者端聯入相應佇列,即會獲取到訊息,並且佇列中的訊息會被消費掉。 若有多個消費端同時連線著佇列,則會已輪詢的方式將佇列中的訊息消費掉。 接下來是程式碼例項: producer生產者 ``` # !/usr/bin/env python import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # 宣告queue channel.queue_declare(queue='balance') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='balance', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close() ``` 傳送過佇列後,可在MQ伺服器中檢視佇列狀態 ``` [root@localhost ~]# rabbitmqctl list_queues Listing queues ... hello 1 ``` consumer消費者 ``` # _*_coding:utf-8_*_ __author__ = 'Alex Li' import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='balance') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='balance', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ``` 接收佇列後,檢視一下佇列狀態 ``` [root@localhost ~]# rabbitmqctl list_queues Listing queues ... hello 0 ``` ## 佇列持久化 當rabbitMQ意外宕機時,可能會有持久化儲存佇列的需求(佇列中的訊息不消失)。 producer ``` # Cheng # !/usr/bin/env python import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # 宣告queue channel.queue_declare(queue='durable',durable=True) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='durable', body='Hello cheng!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent 'Hello cheng!'") connection.close() ``` 執行後檢視佇列,記下佇列名字與佇列中所含訊息的數量 ``` [root@localhost ~]# rabbitmqctl list_queues Listing queues ... durable 1 #重啟rabbitmq [root@localhost ~]# systemctl restart rabbitmq-server #重啟完畢後再次檢視 [root@localhost ~]# rabbitmqctl list_queues Listing queues ... durable #佇列以及訊息並未消失 ``` 執行消費者程式碼 cunsumer ``` # Cheng # _*_coding:utf-8_*_ __author__ = 'Alex Li' import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='durable',durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='durable', #no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ``` 可正確接收到資訊。 再次檢視佇列的情況。 ``` [root@localhost ~]# rabbitmqctl list_queues Listing queues ... durable 0 ``` ## 廣播模式 當producer傳送訊息到佇列後,所有的consumer都會收到訊息,需要注意的是,此模式下producer與concerned之間的關係類似與廣播電臺與收音機,如果廣播後收音機沒有接受到,那麼訊息就會丟失。 建議先執行concerned concerned ``` # _*_coding:utf-8_*_ __author__ = 'Alex Li' import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() channel.exchange_declare(exchange='Clogs', type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='Clogs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() ``` producer ``` import pika import sys credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() channel.exchange_declare(exchange='Clogs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='Clogs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.clos