1. 程式人生 > >RabbitMQ訊息佇列(四):分發到多Consumer(Publish/Subscribe)

RabbitMQ訊息佇列(四):分發到多Consumer(Publish/Subscribe)



 <=== RabbitMQ訊息佇列(三):任務分發機制

      上篇文章中,我們把每個Message都是deliver到某個Consumer。在這篇文章中,我們將會將同一個Message deliver到多個Consumer中。這個模式也被成為 "publish / subscribe"。
    這篇文章中,我們將建立一個日誌系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並列印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁碟上;第二個將log輸出的螢幕。

1. Exchanges

    RabbitMQ 的Messaging Model就是Producer並不會直接傳送Message到queue。實際上,Producer並不知道它傳送的Message是否已經到達queue。

   Producer傳送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然後投遞到queue中。Exchange需要知道如何處理Message,是把它放到那個queue中,還是放到多個queue中?這個rule是通過Exchange 的型別定義的。


   我們知道有三種類型的Exchange:directtopic 和fanout。fanout就是廣播模式,會將所有的Message都放到它所知道的queue中。建立一個名字為logs,型別為fanout的Exchange:

  1. channel.exchange_declare(exchange=
    'logs',  
  2.                          type='fanout')  

Listing exchanges

通過rabbitmqctl可以列出當前所有的Exchange:

  1. $ sudo rabbitmqctl list_exchanges  
  2. Listing exchanges ...  
  3. logs      fanout  
  4. amq.direct      direct  
  5. amq.topic       topic  
  6. amq.fanout      fanout  
  7. amq.headers     headers  
  8. ...done.  

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ預設建立的。

現在我們可以通過exchange,而不是routing_key來publish Message了:

  1. channel.basic_publish(exchange='logs',  
  2.                       routing_key='',  
  3.                       body=message)  

2. Temporary queues

    截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。

    但是對於我們將要構建的日誌系統,並不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
    1) 每當Consumer連線時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在宣告queue時不指定名字,那麼RabbitMQ會隨機為我們選擇這個名字。方法:
  1. result = channel.queue_declare()  
    通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
    2)當Consumer關閉連線時,這個queue要被deleted。可以加個exclusive的引數。方法:
  1. result = channel.queue_declare(exclusive=True)  

3. Bindings繫結

現在我們已經建立了fanout型別的exchange和沒有名字的queue(實際上是RabbitMQ幫我們取了名字)。那exchange怎麼樣知道它的Message傳送到哪個queue呢?答案就是通過bindings:繫結。

方法:

  1. channel.queue_bind(exchange='logs',  
  2.                    queue=result.method.queue)  
現在logs的exchange就將它的Message附加到我們建立的queue了。 Listing bindings

使用命令rabbitmqctl list_bindings

4. 最終版本

    我們最終實現的資料流圖如下:

Producer,在這裡就是產生log的program,基本上和前幾個都差不多。最主要的區別就是publish通過了exchange而不是routing_key。

emit_log.py script:

  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. channel.exchange_declare(exchange='logs',  
  8.                          type='fanout')  
  9. message = ' '.join(sys.argv[1:]) or"info: Hello World!"
  10. channel.basic_publish(exchange='logs',  
  11.                       routing_key='',  
  12.                       body=message)  
  13. print" [x] Sent %r" % (message,)  
  14. connection.close()  
還有一點要注意的是我們聲明瞭exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丟棄的。
Consumer:receive_logs.py:
  1. #!/usr/bin/env python
  2. import pika  
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  4.         host='localhost'))  
  5. channel = connection.channel()  
  6. channel.exchange_declare(exchange='logs',  
  7.                          type='fanout')  
  8. result = channel.queue_declare(exclusive=True)  
  9. queue_name = result.method.queue  
  10. channel.queue_bind(exchange='logs',  
  11.                    queue=queue_name)  
  12. print' [*] Waiting for logs. To exit press CTRL+C'
  13. def callback(ch, method, properties, body):  
  14.     print" [x] %r" % (body,)  
  15. channel.basic_consume(callback,  
  16.                       queue=queue_name,  
  17.                       no_ack=True)  
  18. channel.start_consuming()  
我們開始不是說需要兩個Consumer嗎?一個負責記錄到檔案;一個負責列印到螢幕?
其實用重定向就可以了,當然你想修改callback自己寫檔案也行。我們使用重定向的方法:
We're done. If you want to save logs to a file, just open a console and type:
  1. $ python receive_logs.py > logs_from_rabbit.log  
Consumer2:列印到螢幕:
  1. $ python receive_logs.py  
接下來,Producer:
  1. $ python emit_log.py  
使用命令rabbitmqctl list_bindings你可以看我們建立的queue。一個output:
  1. $ sudo rabbitmqctl list_bindings  
  2. Listing bindings ...  
  3. logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []  
  4. logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []  
  5. ...done.  
這個結果還是很好理解的。

尊重原創,轉載請註明出處 anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19617305

參考資料:

1. http://www.rabbitmq.com/tutorials/tutorial-three-python.html

 <=== RabbitMQ訊息佇列(三):任務分發機制

      上篇文章中,我們把每個Message都是deliver到某個Consumer。在這篇文章中,我們將會將同一個Message deliver到多個Consumer中。這個模式也被成為 "publish / subscribe"。
    這篇文章中,我們將建立一個日誌系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並列印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁碟上;第二個將log輸出的螢幕。

1. Exchanges

    RabbitMQ 的Messaging Model就是Producer並不會直接傳送Message到queue。實際上,Producer並不知道它傳送的Message是否已經到達queue。

   Producer傳送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然後投遞到queue中。Exchange需要知道如何處理Message,是把它放到那個queue中,還是放到多個queue中?這個rule是通過Exchange 的型別定義的。


   我們知道有三種類型的Exchange:directtopic 和fanout。fanout就是廣播模式,會將所有的Message都放到它所知道的queue中。建立一個名字為logs,型別為fanout的Exchange:

  1. channel.exchange_declare(exchange='logs',  
  2.                          type='fanout')  

Listing exchanges

通過rabbitmqctl可以列出當前所有的Exchange:

  1. $ sudo rabbitmqctl list_exchanges  
  2. Listing exchanges ...  
  3. logs      fanout  
  4. amq.direct      direct  
  5. amq.topic       topic  
  6. amq.fanout      fanout  
  7. amq.headers     headers  
  8. ...done.  

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ預設建立的。

現在我們可以通過exchange,而不是routing_key來publish Message了:

  1. channel.basic_publish(exchange='logs',  
  2.                       routing_key='',  
  3.                       body=message)  

2. Temporary queues

    截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。

    但是對於我們將要構建的日誌系統,並不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
    1) 每當Consumer連線時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在宣告queue時不指定名字,那麼RabbitMQ會隨機為我們選擇這個名字。方法:
  1. result = channel.queue_declare()  
    通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
    2)當Consumer關閉連線時,這個queue要被deleted。可以加個exclusive的引數。方法:
  1. result = channel.queue_declare(exclusive=True)  

3. Bindings繫結

現在我們已經建立了fanout型別的exchange和沒有名字的queue(實際上是RabbitMQ幫我們取了名字)。那exchange怎麼樣知道它的Message傳送到哪個queue呢?答案就是通過bindings:繫結。

方法:

  1. channel.queue_bind(exchange='logs',  
  2.                    queue=result.method.queue)  
現在logs的exchange就將它的Message附加到我們建立的queue了。 Listing bindings

使用命令rabbitmqctl list_bindings

4. 最終版本

    我們最終實現的資料流圖如下:

相關推薦

RabbitMQ訊息佇列-通過Topic主題模式分發訊息

前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配訊息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精準匹配的,而topic是通過r

RabbitMQ訊息佇列-安裝amqp擴充套件並訂閱/釋出DemoPHP版

本文將介紹在PHP中如何使用RabbitMQ來實現訊息的訂閱和釋出。我使用的系統依然是Centos7,為了方便,應用伺服器我使用Docker進行部署,容器環境:centos7+nginx+php5.6。 執行環境,安裝AMQP擴充套件: 如何安裝Docker我就不說了,網上很多教程非

python3線程應用詳解圖解線程中LOCK

python3 9.png image 任務 來看 info 對比 body pos 先來看下圖形對比: 發現沒有這種密集型計算的任務中,多線程沒有穿行的速率快,原因就是多線程在線程切換間也是要耗時的而密集型計算任務執行時幾乎沒以偶IO阻塞,這樣你說誰快python

RabbitMQ訊息佇列+spring監聽mq伺服器個ip,接收消費mq訊息(二)

前文用了註解方式實現監聽多個ip,本文用消費端的類實現ServletContextListener監聽器來實現專案啟動時開啟監聽多個ip。大致的程式碼雷同。 環境和框架:和註解方式完全一樣。ssm+maven3.3.9+jdk1.7 1 由於是實

RabbitMQ訊息佇列分發ConsumerPublish/Subscribe

 <=== RabbitMQ訊息佇列(三):任務分發機制       上篇文章中,我們把每個Message都是deliver到某個Consumer。在這篇文章中,我們將會將同一個Message deliver到多個Consumer中。這個模式也被成為

RabbitMQ訊息佇列-服務詳細配置與日常監控管理

RabbitMQ服務管理 啟動服務:rabbitmq-server -detached【 /usr/local/rabbitmq/sbin/rabbitmq-server -detached 】 檢視狀態:rabbitmqctl status 關閉服務:rabbitmqctl stop

RabbitMQ訊息佇列-啟用SSL安全通訊

如果RabbitMQ服務在內網中,只有內網的應用連線,我們認為這些連線都是安全的,但是個別情況我們需要讓RabbitMQ對外提供服務。這種情況有兩種解決方案: 在RabbitMQ外層在封裝一層應用,應用對外提供服務,本質來說RabbitMQ還是隻對內網提供服務。相對更安全,但靈活

RabbitMQ訊息佇列-通過Headers模式分發訊息

Headers型別的exchange使用的比較少,以至於官方文件貌似都沒提到,它是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。傳送者在傳送的時候定義一些鍵值對,接收者也可以再繫結時候傳入一些鍵值對,兩者匹配的

RabbitMQ訊息佇列-訊息任務分發訊息ACK確認機制PHP版

在前面一章介紹了在PHP中如何使用RabbitMQ,至此入門的的部分就完成了,我們內心中一定還有很多疑問:如果多個消費者消費同一個佇列怎麼辦?如果這幾個消費者分任務的權重不同怎麼辦?怎麼把同一個佇列不同級別的任務分發給不同的消費者?如果消費者異常離線怎麼辦?不要著急,後面將慢慢解開面紗。我們

RabbitMQ訊息佇列”Hello, World“

原文地址:http://blog.csdn.net/anzhsoft/article/details/19570187    本文將使用Python(pika 0.9.8)實現從Producer到Consumer傳遞資料”Hello, World“。      首先複習一下上篇所學:RabbitMQ實現

RabbitMQ訊息佇列Publisher的訊息確認機制

       在前面的文章中提到了queue和consumer之間的訊息確認機制:通過設定ack。那麼Publisher能不到知道他post的Message有沒有到達queue,甚至更近一步,是否被某個

十一RabbitMQ訊息佇列-如何實現高可用

在前面講到了RabbitMQ高可用叢集的搭建,但是我們知道只是叢集的高可用並不能保證應用在使用訊息佇列時完全沒有問題,例如如果應用連線的RabbitMQ叢集突然宕機了,雖然這個叢集時可以使用的,但是應用訂閱的連線就斷開了,如果有個機房外網出口頻寬被挖掘機弄斷了,那叢集依然是不可用的。所以我們

RabbitMQ訊息佇列-高可用叢集部署實戰

前幾章講到RabbitMQ單主機模式的搭建和使用,我們在實際生產環境中出於對效能還有可用性的考慮會採用叢集的模式來部署RabbitMQ。 RabbitMQ叢集基本概念 Rabbit模式大概分為以下三種:單主機模式、普通叢集模式、映象叢集模式。 單主機模式:

RabbitMQ訊息佇列-通過fanout模式將訊息推送到個Queue中

前面第六章我們使用的是direct直連模式來進行訊息投遞和分發。本章將介紹如何使用fanout模式將訊息推送到多個佇列。 有時我們會遇到這樣的情況,多個功能模組都希望得到完整的訊息資料。例如一個log的訊息,一個我們希望輸出在螢幕上實時監控,另外一個使用者持久化日誌。這時就可以使用fano

十三RabbitMQ訊息佇列-VirtualHost與許可權管理

VirtualHost 像mysql有資料庫的概念並且可以指定使用者對庫和表等操作的許可權。那RabbitMQ呢?RabbitMQ也有類似的許可權管理。在RabbitMQ中可以虛擬訊息伺服器VirtualHost,每個VirtualHost相當月一個相對獨立的RabbitMQ伺服器,每個

十二RabbitMQ訊息佇列-效能測試

硬體配置 宿主機用的聯想3850X6的伺服器四顆E7-4850v3的處理器,DDR4記憶體,兩塊1.25TB的pcie固態。在宿主機上使用的事esxi5.5的虛擬化平臺,在子系統中安裝RabbitMQ和測試指令碼,RabbitMQ配置如下: CPU:24核 記憶體:24GB 硬碟:

RabbitMQ訊息佇列: Detailed Introduction 詳細介紹

原文地址:http://blog.csdn.net/anzhsoft/article/details/19563091 1. 歷史     RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的

RabbitMQ訊息佇列一基本概念和常用命令

常用命令 /etc/init.d/rabbitmq-server start|stop|restart|reload rabbitmqctl  add_vhost  vhostname  ##建立vhost rabbitmqctl   delete_vhost  v

RabbitMQ訊息佇列入門篇環境配置+Java例項+基礎概念

一、訊息佇列使用場景或者其好處 訊息佇列一般是在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。 在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。訊息

入門RabbitMQ訊息佇列結合SSH框架配置篇

使用RabbitMQ訊息佇列,因為訊息佇列的非同步思想,解耦,以及允許短暫的不一致性,就像我現在把東西放在桌子上,你可以去拿,別人也可以去拿,而我不用等人拿完我便放東西上去,這樣就保證了我(生產者)和接收者沒有什麼聯絡,而且接受者可以隨時去拿。我們要使用RabbitMQ,安裝