1. 程式人生 > >RabbitMQ消息隊列(二):”Hello, World“

RabbitMQ消息隊列(二):”Hello, World“

復雜 article ins don title apple lar github publish

本文將使用Python(pika 0.9.8)實現從Producer到Consumer傳遞數據”Hello, World“。

首先復習一下上篇所學:RabbitMQ實現了AMQP定義的消息隊列。它實現的功能”非常簡單“:從Producer接收數據然後傳遞到Consumer。它能保證多並發,數據安全傳遞,可擴展。

和任何的Hello world一樣,它們都不復雜。我們將會設計兩個程序,一個發送Hello world,另一個接收這個數據並且打印到屏幕。
整體的設計如下圖:

技術分享

1. 環境配置

RabbitMQ 實現了AMQP。因此,我們需要安裝AMPQ的library。幸運的是對於多種編程語言都有實現。我們可以使用以下lib的任何一個:

  • py-amqplib
  • txAMQP
  • pika

在這裏我們將使用pika. 可以通過 pip 包管理工具來安裝:

$ sudo pip install pika==0.9.8

這個安裝依賴於pip和Git-core。

  • On Ubuntu:

    $ sudo apt-get install python-pip git-core
    
  • On Debian:

    $ sudo apt-get install python-setuptools git-core
    $ sudo easy_install pip
    
  • On Windows:To install easy_install, run the MS Windows Installer for setuptools

    > easy_install pip
    > pip install pika==0.9.8

    2. Sending

    技術分享

    第一個program send.py:發送Hello world 到queue。正如我們在上篇文章提到的,你程序的第一句話就是建立連接,第二句話就是創建channel:

    #!/usr/bin/env python  
    import pika  
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(  
                   localhost))  
    channel = connection.channel() 

    創建連接傳入的參數就是RabbitMQ Server的ip或者name。

    關於誰創建queue,上篇文章也討論過:Producer和Consumer都應該去創建。

    接下來我們創建名字為hello的queue:

    channel.queue_declare(queue=hello)  

    創建了channel,我們可以通過相應的命令來list queue:

    $ sudo rabbitmqctl list_queues  
    Listing queues ...  
    hello    0  
    ...done.  

    現在我們已經準備好了發送了。
    從架構圖可以看出,Producer只能發送到exchange,它是不能直接發送到queue的。現在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發送給指定的queue。routing_key就是指定的queue名字。

    channel.basic_publish(exchange=‘‘,  
                          routing_key=hello,  
                          body=Hello World!)  
    print " [x] Sent ‘Hello World!‘"  

    退出前別忘了關閉connection。

    connection.close()  

    3. Receiving

    技術分享

    第二個program receive.py 將從queue中獲取Message並且打印到屏幕。

    第一步還是創建connection。第二步創建channel。第三步創建queue,name = hello:

    channel.queue_declare(queue=hello)  

    接下來要subscribe了。在這之前,需要聲明一個回調函數來處理接收到的數據。

    def callback(ch, method, properties, body):  
        print " [x] Received %r" % (body,) 

    subscribe:

    channel.basic_consume(callback,  
                          queue=hello,  
                          no_ack=True)  

    最後,準備好無限循環監聽吧:

    print  [*] Waiting for messages. To exit press CTRL+C  
    channel.start_consuming() 

    4. 最終版本

    send.py:

    #!/usr/bin/env python  
    import pika  
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(  
            host=localhost))  
    channel = connection.channel()  
      
    channel.queue_declare(queue=hello)  
      
    channel.basic_publish(exchange=‘‘,  
                          routing_key=hello,  
                          body=Hello World!)  
    print " [x] Sent ‘Hello World!‘"  
    connection.close()  

    receive.py:

    #!/usr/bin/env python  
    import pika  
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(  
            host=localhost))  
    channel = connection.channel()  
      
    channel.queue_declare(queue=hello)  
      
    print  [*] Waiting for messages. To exit press CTRL+C  
      
    def callback(ch, method, properties, body):  
        print " [x] Received %r" % (body,)  
      
    channel.basic_consume(callback,  
                          queue=hello,  
                          no_ack=True)  
      
    channel.start_consuming() 

    5. 最終運行

    先運行 send.py program:

    $ python send.py  
    [x] Sent Hello World!

    send.py 每次運行完都會停止。註意:現在數據已經存到queue裏了。接收它:

    $ python receive.py  
    [*] Waiting for messages. To exit press CTRL+C  
    [x] Received Hello World!  

    轉載anzhsoft: http://blog.csdn.NET/anzhsoft/article/details/19570187

RabbitMQ消息隊列(二):”Hello, World“