1. 程式人生 > >RabbitMQ中RPC的實現及其通信機制

RabbitMQ中RPC的實現及其通信機制

pub elf tcl consumer 兩個 rabbit client margin result

RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回復響應消息,為了接受響應response,客戶端需要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶上一個唯一的correlation_id,相應的服務端處理計算後會將結果返回到對應的correlation_id。

RPC調用流程:

技術分享圖片

當生產者啟動時,它會創建一個匿名的獨占回調隊列,對於一個RPC請求,生產者發送一條具有兩個屬性的消息:reply_to(回調隊列),correlation_id(每個請求的唯一值),請求被發送到rpc_queue隊列,消費者等待該隊列上的請求。當一個請求出現時,它會執行該任務,將帶有結果的消息發送回生產者。

生產者等待回調隊列上的數據,當消息出現時,它檢查相關ID屬性,如果它與請求中的值匹配,則返回對應用程序的響應。

RabbitMQ斐波拉契計算的RPC,消費者實現:

"""
基於RabbitMQ實現RPC通信機制 --> 服務端
"""

import pika
import uuid
from functools import lru_cache


class RabbitServer(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(
            pika.ConnectionParameters(host
=localhost, port=5672) ) self.channel = self.conn.channel() # 聲明一個隊列,並進行持久化,exclusive設置為false self.channel.queue_declare( exclusive=False, durable=True, queue=task_queue ) # 聲明一個exhange交換機,類型為topic self.channel.exchange_declare( exchange
=logs_rpc, exchange_type=topic, durable=True ) # 將隊列與交換機進行綁定 routing_keys = [#] # 接受所有的消息 for routing_key in routing_keys: self.channel.queue_bind( exchange=logs_rpc, queue=task_queue, routing_key=routing_key ) @lru_cache() def fib(self, n): """ 斐波那契數列.===>程序的處理邏輯 使用lru_cache 優化遞歸 :param n: :return: """ if n == 0: return 0 elif n == 1: return 1 else: return self.fib(n - 1) + self.fib(n - 2) def call_back(self, channel, method, properties, body): print(------------------------------------------) print(接收到的消息為(斐波那契數列的入參項為):{}.format(str(body))) print(消息的相關屬性為:) print(properties) value = self.fib(int(body)) print(斐波那契數列的運行結果為:{}.format(str(value))) # 交換機將消息發送到隊列 self.channel.basic_publish( exchange=‘‘, routing_key=properties.reply_to, body=str(value), properties=pika.BasicProperties( delivery_mode=2, correlation_id=properties.correlation_id, )) # 消費者對消息進行確認 self.channel.basic_ack(delivery_tag=method.delivery_tag) def receive_msg(self): print(開始接受消息...) self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( consumer_callback=self.call_back, queue=task_queue, no_ack=False, # 消費者對消息進行確認 consumer_tag=str(uuid.uuid4()) ) def consume(self): self.receive_msg() self.channel.start_consuming() if __name__ == __main__: rabbit_consumer = RabbitServer() rabbit_consumer.consume()

生產者實現:

"""
基於RabbitMQ實現RPC通信機制 --> 客戶端
"""

import pika
import uuid
import time


class RabbitClient(object):
    def __init__(self):
        # 與RabbitMq服務器建立連接
        self.conn = pika.BlockingConnection(
            pika.ConnectionParameters(host=localhost, port=5672)
        )
        self.channel = self.conn.channel()

        # 聲明一個exchange交換機,交換機的類型為topic
        self.channel.exchange_declare(
            exchange=logs_rpc, exchange_type=topic, durable=True
        )

        # 聲明一個回調隊列,用於接受RPC回調結果的運行結果
        result = self.channel.queue_declare(durable=True, exclusive=False)
        self.call_queue = result.method.queue

        # 從回調隊列當中獲取運行結果.
        self.channel.basic_consume(
            consumer_callback=self.on_response,
            queue=self.call_queue,
            no_ack=False
        )

    def on_response(self, channel, method, properties, body):
        """
        對收到的消息進行確認
        找到correlation_id與服務端的消息標識匹配的消息結果
        :param channel:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        if self.corr_id == properties.correlation_id:
            self.response = body
            print(斐波那契數列的RPC返回結果是:{}.format(body))
            print(相關屬性信息:)
            print(properties)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)

    def send_msg(self, routing_key, message):
        """
        exchange交換機將根據消息的路由鍵將消息路由到對應的queue當中
        :param routing_key: 消息的路由鍵
        :param message: 生成者發送的消息
        :return:
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange=logs_rpc,
            routing_key=routing_key,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,
                correlation_id=self.corr_id,
                reply_to=self.call_queue,
            ))

        while self.response is None:
            print(等待遠程服務端的返回結果...)
            self.conn.process_data_events()  # 非阻塞式的不斷獲取消息.

        return self.response

    def close(self):
        self.conn.close()


if __name__ == "__main__":
    rabbit_producer = RabbitClient()
    routing_key = hello every one
    start_time = int(time.time())
    for item in range(2000):
        num = str(item)
        print(生產者發送的消息為:{}.format(num))
        rabbit_producer.send_msg(routing_key, num)
    end_time = int(time.time())
    print("耗時{}s".format(str(end_time - start_time)))

計算2000以內的斐波拉契數列,執行結果如下:

技術分享圖片

技術分享圖片

RabbitMQ中RPC的實現及其通信機制