Python 訊息佇列rabbitmq使用之 實現一個RPC系統
阿新 • • 發佈:2018-12-10
1、服務端程式碼
# rpc_server.py
import pika
# 建立連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定一個接收佇列
channel.queue_declare(queue='rpc_queue')
# 遠端要呼叫的方法
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
# 響應
def on_request(ch, method, props, body):
# 得到傳遞過來的字元
n = int(body)
print(" [.] fib(%s)" % n)
# 呼叫方法
response = fib(n)
# 再將呼叫的結果返回給客戶端
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 獲取要傳送的佇列
properties=pika.BasicProperties(correlation_id = props.correlation_id), # 將接收到的id傳送過去
body=str(response) # 將方法返回的結果返回給客戶端
)
# 進行訊息確認,不確認會導致不能夠釋放沒響應的訊息,RabbitMQ就會佔用越來越多的記憶體。
ch.basic_ack(delivery_tag = method.delivery_tag)
# 表示開啟一個程序工作
channel.basic_qos(prefetch_count=1)
# 接收客戶端發來的訊息,並使用了接收佇列
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
# 開始消費
channel.start_consuming()
2、客戶端程式碼
# rpc_client.py
import pika
import uuid
# 定義一個客戶端類
class FibonacciRpcClient(object):
def __init__(self):
# 初始化連結
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 初始化管道
self.channel = self.connection.channel()
# 因為釋出訂閱用的是隨機名佇列,所以當與消費者斷開連線的時候,這個佇列應當被立即刪除。exclusive識別符號即可達到此目的。
result = self.channel.queue_declare(exclusive=True)
# 自動獲取一個佇列
self.callback_queue = result.method.queue
# 開始消費訊息,,其實這裡是一個回撥,,當傳送訊息給服務端後,服務端會發送確認訊息給客戶端,所以是這個存在的意義
self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
# 初始化返回資訊
self.response = None
# 初始化一個id並儲存下來
self.corr_id = str(uuid.uuid4())
# 傳送訊息,將這個訊息傳送給服務端
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
# 傳送回撥的佇列過去
reply_to = self.callback_queue,
# 傳送id過去
correlation_id = self.corr_id,
),
body=str(n) # 傳送要執行的數字
)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
3、如何執行 先執行服務端:
python rpc_server.py
再執行客戶端:
python rpc_client.py