1. 程式人生 > >python-RabbtiMQ消息隊列

python-RabbtiMQ消息隊列

支持 soc 回復 易用 pytho 消息 分布式系 回調 urn

1.RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

2.RabbitMQ能為你做些什麽?

消息系統允許軟件、應用相互連接和擴展.這些應用可以相互鏈接起來組成一個更大的應用,或者將用戶設備和數據進行連接.消息系統通過將消息的發送和接收分離來實現應用程序的異步和解偶.

或許你正在考慮進行數據投遞,非阻塞操作或推送通知。或許你想要實現發布/訂閱,異步處理,或者工作隊列。所有這些都可以通過消息系統實現。

RabbitMQ是一個消息代理 - 一個消息系統的媒介。它可以為你的應用提供一個通用的消息發送和接收平臺,並且保證消息在傳輸過程中的安全。

3.RabbitMQ 安裝使用

4.Python應用RabbitMQ

python操作RabbitMQ的模塊有三種:pika,Celery,Haigha。
本文使用的是pika。
"""
RabbitMQ-生產者。
"""

import pika

"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)

"""聲明一個管道"""
channel = connection.channel()

"""定義一個queue,定義queue名稱,標識"""
channel.queue_declare(queue=hello)



"""定義queue中的消息內容"""
channel.basic_publish(exchange
=‘‘, routing_key=hello, body=Hello World!) print(" [x] Sent ‘Hello World!‘")
"""
RabbitMQ-消費者。
"""

import pika

"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)

"""聲明一個管道"""
channel = connection.channel()

"""定義一個queue,定義queue名稱,標識,與生產者隊列中對應"""
channel.queue_declare(queue=hello)

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)

"""消費,接收消息..."""
channel.basic_consume(
    consumer_callback=callback, #   如果收到消息,則回調這個函數處理消息
    queue=hello, # queue_declare(queue=‘hello‘) 對應
    no_ack=True
)

"""
    消費者會一直監聽這queue,如果隊列中沒有消息,則會卡在這裏,等待消息隊列中生成消息。
"""

print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()

5.RabbitMQ消息持久化

技術分享圖片
import pika


queue_name = xiaoxi_

"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()
"""定義一個queue,定義queue名稱,標識
    queue,durable 持久化
"""
channel.queue_declare(queue=queue_name)


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定義queue中的消息內容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=‘‘,
            routing_key=queue_name,
            body=input_value,
            properties=pika.BasicProperties(   # 消息持久化.....
                delivery_mode=2,
            )
        )
    continue
producer.py 技術分享圖片
import pika,time
queue_name = xiaoxi_

"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()
"""定義一個queue,定義queue名稱,標識"""
channel.queue_declare(queue=queue_name)

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    #time.sleep(5) # 模擬消費者丟失生產者發送的消息,生產者消息隊列中的這一條消息則不會刪除。
    print(rev messages-->,body)

    """手動向生產者確認收到消息"""
    #ch.basic_ack(delivery_tag=method.delivery_tag)

"""消費,接收消息..."""
channel.basic_consume(
    consumer_callback=callback, #   如果收到消息,則回調這個函數處理消息
    queue=queue_name,
    #no_ack=True  #接收到消息,主動向生產者確認已經接收到消息。
)

print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

6.RabbitMQ消息公平分發

技術分享圖片
import pika


queue_name = xiaoxi_1

"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()
"""定義一個queue,定義queue名稱,標識
    queue,durable 持久化
"""
channel.queue_declare(queue=queue_name)


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定義queue中的消息內容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=‘‘,
            routing_key=queue_name,
            body=input_value,
        )
    continue
producer.py 技術分享圖片
import pika,time


queue_name = xiaoxi_1

"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()
"""定義一個queue,定義queue名稱,標識
    queue,durable 持久化
"""
channel.queue_declare(queue=queue_name)


def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    """模擬處理消息快慢速度"""
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)


"""根據消費者處理消息的快慢公平分發消息"""
channel.basic_qos(prefetch_count=1)


"""消費,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,則回調這個函數處理消息
    queue=queue_name,
   # no_ack=True  #接收到消息,主動向生產者確認已經接收到消息。
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

7.RabbitMQ-廣播模式。

    消息的發送模式類型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
        2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
            表達式符號說明:#代表一個或多個字符,*代表任何字符
            例:#.a會匹配a.a,aa.a,aaa.a等
                  *.a會匹配a.a,b.a,c.a等
            註:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 
        4.headers: 通過headers 來決定把消息發給哪些queue (少用)

7.1 topic 廣播模式。

技術分享圖片
import pika


"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()


"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""
exchange_name = topic_messages1
routing_key = my_topic


"""定義exchage模式 direct廣播模式"""
channel.exchange_declare(exchange=exchange_name,exchange_type=topic)
"""
    消息的發送模式類型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
        2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
            表達式符號說明:#代表一個或多個字符,*代表任何字符
            例:#.a會匹配a.a,aa.a,aaa.a等
                  *.a會匹配a.a,b.a,c.a等
            註:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 
        4.headers: 通過headers 來決定把消息發給哪些queue (少用)
"""


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定義queue中的消息內容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,
            body=input_value,
        )
    continue
producer.py 技術分享圖片
import pika,time



"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()

"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""
exchange_name = topic_messages1
routing_key = my_topic

channel.exchange_declare(exchange=exchange_name,exchange_type=topic)


"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除"""
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)


print(direct_key:{0}.format(routing_key))
print(queue_name:{0}.format(queue_name))

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


"""消費,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,則回調這個函數處理消息
    queue=queue_name,
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

7.2 direct 廣播模式

技術分享圖片
import pika


connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
channel = connection.channel()


"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""
exchange_name = direct_messages
routing_key = my_direct

"""
    定義exchage模式 direct廣播模式
    消息的發送模式類型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
        2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
            表達式符號說明:#代表一個或多個字符,*代表任何字符
            例:#.a會匹配a.a,aa.a,aaa.a等
                  *.a會匹配a.a,b.a,c.a等
            註:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 
        4.headers: 通過headers 來決定把消息發給哪些queue (少用)
"""
channel.exchange_declare(exchange=exchange_name,exchange_type=direct)

channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=hello word!,
)


# while True:
#     input_value = input(">>:").strip()
#     if input_value:
#         """定義queue中的消息內容"""
#         print(‘producer messages:{0}‘.format(input_value))
#         channel.basic_publish(
#             exchange=exchange_name,
#             routing_key=routing_key,
#             body=input_value,
#         )
#     continue
producer.py 技術分享圖片
import pika,time



connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
channel = connection.channel()

exchange_name = direct_messages
routing_key = my_direct

channel.exchange_declare(exchange=exchange_name,exchange_type=direct)


"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除"""
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)


print(direct_key:{0}.format(routing_key))
print(queue_name:{0}.format(queue_name))

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


"""消費,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,則回調這個函數處理消息
    queue=queue_name,
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

7.3 fanout 廣播模式

技術分享圖片
import pika


"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()


exchange_name = messages
"""定義exchage模式 fanout廣播模式"""
channel.exchange_declare(exchange=exchange_name,exchange_type=fanout)
"""
    消息的發送模式類型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
        2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
            表達式符號說明:#代表一個或多個字符,*代表任何字符
            例:#.a會匹配a.a,aa.a,aaa.a等
                  *.a會匹配a.a,b.a,c.a等
            註:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 
        4.headers: 通過headers 來決定把消息發給哪些queue (少用)
"""


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定義queue中的消息內容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=‘‘,
            body=input_value,
        )
    continue
producer.py 技術分享圖片
import pika,time



"""聲明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""聲明一個管道"""
channel = connection.channel()

"""
    
"""
exchange_name = messages
channel.exchange_declare(exchange=exchange_name,exchange_type=fanout)

"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除"""
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name)
"""每一個消費者隨機一個唯一的queue_name"""
print(queue_name:{0},format(queue_name))


def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    ch.basic_ack(delivery_tag=method.delivery_tag)



"""消費,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,則回調這個函數處理消息
    queue=queue_name,
   # no_ack=True  #接收到消息,主動向生產者確認已經接收到消息。
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

8 RabbitMQ 實現 RPC

技術分享圖片
"""
RabbitMQ-生產者。
利用rabbitMQ 實現一個能收能發的RPC小程序。
重點需要註意的是:queue的綁定。接收的一端必選預先綁定queue生成隊列,發送端才能根據queue發送。
"""

import pika,uuid,time


class rabbitmqClient(object):

    def __init__(self,rpc_queue):
        self.rpc_queue = rpc_queue
        self.app_id = str(uuid.uuid4())

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
        self.channel = self.connection.channel()

        """生成一個自動queue,傳過去server,server再往這個自動queue回復數據"""
        autoqueue = self.channel.queue_declare(exclusive=True)
        self.callback_queue = autoqueue.method.queue

        """先定義一個接收回復的動作"""
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)


    def on_response(self,ch,method,properties,body):
        if self.app_id == properties.app_id:
            self.response = body



    def send(self,msg):

        self.response = None
        self.channel.basic_publish(
            exchange=‘‘,
            routing_key=self.rpc_queue,
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                app_id=self.app_id,
            ),
            body=str(msg)
        )

        #   發送完消息,進入接收模式。
        while self.response is None:
            # print(‘callback_queue:{0} app_id:{1} wait...‘.format(self.callback_queue,self.app_id))
            self.connection.process_data_events()
            # time.sleep(0.5)
        return self.response



rpc_request_queue = rpc_request_queue

rb = rabbitmqClient(rpc_request_queue)

while True:
    msg = input(input >> :).strip()
    if msg:
        print(rpc_queue:{0} app_id:{1}.format(rb.rpc_queue,rb.app_id))
        print(send msg:{}.format(msg))
        reponses = rb.send(msg)
        print(reponses msg:{}.format(reponses.decode(utf-8)))
    continue
client.py 技術分享圖片
"""
RabbitMQ-消費者。
"""

import pika

class rabbitmqServer(object):

    def __init__(self,rpc_queue):
        self.rpc_queue = rpc_queue

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
        self.channel = self.connection.channel()

        self.channel.queue_declare(queue=self.rpc_queue)


    def on_reponses(self,ch,method,properties,body):
        if body:
            # reponser ...
            ch.basic_publish(exchange=‘‘,
                             routing_key=properties.reply_to,
                             properties=pika.BasicProperties(
                                 reply_to=properties.reply_to,
                                 app_id=properties.app_id,
                             ),
                             body=reponses ok! msg is:{}.format(body.decode(utf-8)))


    def start_consuming(self):
        self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)
        print(waiting for meassages, to exit press CTRL+C)
        self.channel.start_consuming()




rpc_request_queue = rpc_request_queue

rd_server = rabbitmqServer(rpc_request_queue)

rd_server.start_consuming()
server.py

python-RabbtiMQ消息隊列