1. 程式人生 > >Python 訊息佇列rabbitmq使用之 實現一個RPC系統

Python 訊息佇列rabbitmq使用之 實現一個RPC系統

1、服務端程式碼

# rpc_server.py
import pika

# 建立連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

# 指定一個接收佇列
channel.queue_declare(queue='rpc_queue')

# 遠端要呼叫的方法
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return
1 else: return fib(n-1) + fib(n-2) # 響應 def on_request(ch, method, props, body): # 得到傳遞過來的字元 n = int(body) print(" [.] fib(%s)" % n) # 呼叫方法 response = fib(n) # 再將呼叫的結果返回給客戶端 ch.basic_publish(exchange='', routing_key=props.reply_to, # 獲取要傳送的佇列
properties=pika.BasicProperties(correlation_id = props.correlation_id), # 將接收到的id傳送過去 body=str(response) # 將方法返回的結果返回給客戶端 ) # 進行訊息確認,不確認會導致不能夠釋放沒響應的訊息,RabbitMQ就會佔用越來越多的記憶體。 ch.basic_ack(delivery_tag = method.delivery_tag) # 表示開啟一個程序工作
channel.basic_qos(prefetch_count=1) # 接收客戶端發來的訊息,並使用了接收佇列 channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") # 開始消費 channel.start_consuming()

2、客戶端程式碼

# rpc_client.py
import pika
import uuid

# 定義一個客戶端類
class FibonacciRpcClient(object):
    def __init__(self):
        # 初始化連結
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        # 初始化管道
        self.channel = self.connection.channel()
        # 因為釋出訂閱用的是隨機名佇列,所以當與消費者斷開連線的時候,這個佇列應當被立即刪除。exclusive識別符號即可達到此目的。
        result = self.channel.queue_declare(exclusive=True)
        # 自動獲取一個佇列
        self.callback_queue = result.method.queue

        # 開始消費訊息,,其實這裡是一個回撥,,當傳送訊息給服務端後,服務端會發送確認訊息給客戶端,所以是這個存在的意義
        self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        # 初始化返回資訊
        self.response = None
        # 初始化一個id並儲存下來
        self.corr_id = str(uuid.uuid4())
        # 傳送訊息,將這個訊息傳送給服務端
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         # 傳送回撥的佇列過去
                                         reply_to = self.callback_queue,
                                         # 傳送id過去 
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n) # 傳送要執行的數字
                                   )
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

3、如何執行 先執行服務端:

python rpc_server.py

再執行客戶端:

python rpc_client.py