1. 程式人生 > >Rabbitmq中的RPC通訊機制

Rabbitmq中的RPC通訊機制

具體工作機制:

Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id
     property. If it matches the value from the request it returns the response to the application.

客戶端程式碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服務端程式碼:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

上面的這個程式是官網提供的,乍一看是不是有點懵,我們將程式進行分解:

第一步:普通的釋出者與訂閱者模型

釋出者:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import pika
import sys,os
import time
import uuid

import pika
import os,sys
import time
import uuid


class FibonaciClint(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
        self.channel = self.conn.channel()
        self.channel.queue_declare(queue='rpc_queue',exclusive=False,durable=True)


    def call(self,message):
        print('....開始傳送訊息....==>%s'%message)

        self.uuid = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   body=message,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       correlation_id = self.uuid,
                                   ))
        print('傳送訊息Successful...')

    def close(self):
        self.conn.close()


if __name__ == '__main__':
    fibonaci = FibonaciClint()
    message = ''.join(sys.argv[1:]) or 'Hello,RabbitMQ.'
    fibonaci.call(message)

訂閱者:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import pika
import sys,os
import time
import uuid


class FibonaciServer(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
        self.channel = self.conn.channel()

    def fib(self,n):  # 定義一個主邏輯:斐波那契數列.===>程式的處理邏輯在這裡寫.
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return self.fib(n - 1) + self.fib(n - 2)

    def on_request(self,channel,method,properties,body):
        print('----------------------------------------')
        print('正在消費的訊息:====>%s'%body)
        time.sleep(5)
        print('訊息的相關屬性為:')
        print(properties)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        print('----------------------------------------')


    def call_back(self):
        self.channel.basic_qos(prefetch_count=2)
        self.channel.basic_consume(consumer_callback=self.on_request,
                                   queue='rpc_queue',
                                   no_ack=False)

    def start_consume(self):
        self.channel.start_consuming()


if __name__ == '__main__':
    fibonaci = FibonaciServer()
    fibonaci.call_back()
    fibonaci.start_consume()

執行資訊:

D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/consumer.py"
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=4e0ec5e9-44c3-45c9-8a5c-686e34bf2785', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=2d82b7e9-7e3a-4495-b40a-dcd334b983ee', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=55ebcbdd-2f90-4c15-80b6-33bd90f05105', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=fd6ff3c2-230c-44ac-9d7d-da8474de5bd6', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=c4ad05cf-a1b4-4aec-a209-5a653b5154af', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=854a8b6c-7c36-457c-bda6-29c53f5646f8', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=433625d0-eed8-4546-ba5b-d41256fcb53c', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=8bd94f42-2a5c-439c-9082-5883bfdb8481', 'delivery_mode=2'])>
----------------------------------------

第二步:增加部分屬性資訊(回撥佇列的名字作為訊息的屬性資訊傳送)

釋出者:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import pika
import sys,os
import time
import uuid

import pika
import os,sys
import time
import uuid


class FibonaciClint(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
        self.channel = self.conn.channel()
        self.channel.queue_declare(queue='rpc_queue',exclusive=False,durable=True)

        #TODO 增加回調佇列(回撥佇列的名字作為屬性進行傳送.)
        result = self.channel.queue_declare(durable=True,exclusive=False)
        self.call_queue = result.method.queue


    def call(self,message):
        print('....開始傳送訊息....==>%s'%message)

        self.uuid = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   body=message,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       correlation_id = self.uuid,
                                       reply_to=self.call_queue,   #增加一個屬性.
                                   ))
        print('傳送訊息Successful...')

    def close(self):
        self.conn.close()


if __name__ == '__main__':
    fibonaci = FibonaciClint()
    message = ''.join(sys.argv[1:]) or '3'
    fibonaci.call(message)

訂閱者:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import pika
import sys,os
import time
import uuid


class FibonaciServer(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
        self.channel = self.conn.channel()

    def fib(self,n):  # 定義一個主邏輯:斐波那契數列.===>程式的處理邏輯在這裡寫.
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return self.fib(n - 1) + self.fib(n - 2)

    def on_request(self,channel,method,properties,body):
        print('----------------------------------------')
        print('正在消費的訊息:====>%s'%body)
        time.sleep(5)
        print('訊息的相關屬性為:')
        print(properties)
        value = self.fib(int(body))
        print('原值: ',body,'斐波那契的執行結果: ',value)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        print('----------------------------------------')


    def call_back(self):
        self.channel.basic_qos(prefetch_count=2)
        self.channel.basic_consume(consumer_callback=self.on_request,
                                   queue='rpc_queue',
                                   no_ack=False)

    def start_consume(self):
        self.channel.start_consuming()


if __name__ == '__main__':
    fibonaci = FibonaciServer()
    fibonaci.call_back()
    fibonaci.start_consume()

執行相關資訊:


===========>

D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/consumer.py"
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=b67e4865-0c5b-4d6a-bca0-32a9f0f26357', 'delivery_mode=2', 'reply_to=amq.gen-FaEBXSQ5llJzoK3lisTMJQ'])>
原值:  b'3' 斐波那契的執行結果:  2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=db3cd6c4-ab01-43e5-8ede-86e9aa1b3bf9', 'delivery_mode=2', 'reply_to=amq.gen-vffPvlsxRGFwMHcjLAU2kg'])>
原值:  b'3' 斐波那契的執行結果:  2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=94b1103e-aa62-43b5-8f63-f6c645e1265d', 'delivery_mode=2', 'reply_to=amq.gen-B4m6KcbtYPrlqOveQtnZaQ'])>
原值:  b'3' 斐波那契的執行結果:  2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=efde1f20-765a-4c1b-bf25-d4b8a00e8c97', 'delivery_mode=2', 'reply_to=amq.gen-E5t24uT0lEON92GsZ0y7HA'])>
原值:  b'3' 斐波那契的執行結果:  2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'1'
訊息的相關屬性為:
<BasicProperties(['correlation_id=3ed9af6a-6154-4877-ab6a-3b4a97c00c59', 'delivery_mode=2', 'reply_to=amq.gen-LJ5QpG1YwG5kq2XL5RnTTQ'])>
原值:  b'1' 斐波那契的執行結果:  1
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'2'
訊息的相關屬性為:
<BasicProperties(['correlation_id=380efb79-8d95-4e32-902f-64c5619f3cc0', 'delivery_mode=2', 'reply_to=amq.gen-rbp6hADtBC7CU0dJ3_iawA'])>
原值:  b'2' 斐波那契的執行結果:  1
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=ab363df4-ac91-4643-8cd7-50e977297131', 'delivery_mode=2', 'reply_to=amq.gen-pVsKxifFexmpneFDquTrSw'])>
原值:  b'3' 斐波那契的執行結果:  2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'4'
訊息的相關屬性為:
<BasicProperties(['correlation_id=e6e75d07-7d55-4dcc-aa99-00d7ebea5967', 'delivery_mode=2', 'reply_to=amq.gen-O8kTGUggp2YxIsEs7UQJCQ'])>
原值:  b'4' 斐波那契的執行結果:  3
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'5'
訊息的相關屬性為:
<BasicProperties(['correlation_id=d69dca66-71e6-4a2f-a4df-0a64be9a7d5b', 'delivery_mode=2', 'reply_to=amq.gen-qMEQFULAm7S6_WaIgF4tJw'])>
原值:  b'5' 斐波那契的執行結果:  5
----------------------------------------

第三步:完整程式

釋出者:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import pika
import sys,os
import time
import uuid

import pika
import os,sys
import time
import uuid


class FibonaciClint(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
        self.channel = self.conn.channel()
        self.channel.queue_declare(queue='rpc_queue',exclusive=False,durable=True)

        #TODO 增加回調佇列(回撥佇列的名字作為屬性進行傳送.)
        result = self.channel.queue_declare(durable=True,exclusive=False)
        self.call_queue = result.method.queue

        #TODO 增加消費訊息部分程式碼
        self.channel.basic_consume(consumer_callback=self.on_response,
                                   queue=self.call_queue,
                                   no_ack=False)

    def on_response(self,channel,method,properties,body): #注意:這裡是相應結果
        print('RPC 正在獲取RPC響應結果:...')
        if self.uuid == properties.correlation_id:
            self.response = str(body)
            print('相關屬性資訊:')
            print(properties)
            print('獲取到的執行結果是:%s'%self.response)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)


    def call(self,message):
        self.response = None
        self.uuid = str(uuid.uuid4())
        print('....開始傳送訊息....==>%s' % message)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   body=message,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       correlation_id = self.uuid,
                                       reply_to=self.call_queue,   #增加一個屬性.
                                   ))
        print('傳送訊息Successful...')

        while self.response is None:
            time.sleep(3)
            print('正在等待RPC呼叫結果....')
            self.conn.process_data_events()  #相當於非阻塞消費

        return self.response


    def close(self):
        self.conn.close()


if __name__ == '__main__':
    fibonaci = FibonaciClint()
    message = ''.join(sys.argv[1:]) or '3'
    fibonaci.call(message)

接受者:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import pika
import sys,os
import time
import uuid


class FibonaciServer(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
        self.channel = self.conn.channel()

    def fib(self,n):  # 定義一個主邏輯:斐波那契數列.===>程式的處理邏輯在這裡寫.
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return self.fib(n - 1) + self.fib(n - 2)

    def on_request(self,channel,method,properties,body):
        print('----------------------------------------')
        print('正在消費的訊息:====>%s'%body)
        time.sleep(5)
        print('訊息的相關屬性為:')
        print(properties)
        value = self.fib(int(body))
        print('原值: ',body,'斐波那契的執行結果: ',value)

        print('將計算的執行結果返回給RPC客戶端....')
        self.channel.basic_publish(exchange='',
                                   routing_key=properties.reply_to,
                                   body=str(value),
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       correlation_id=properties.correlation_id,
                                   ))

        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        print('----------------------------------------')


    def call_back(self):
        self.channel.basic_qos(prefetch_count=2)
        self.channel.basic_consume(consumer_callback=self.on_request,
                                   queue='rpc_queue',
                                   no_ack=False)

    def start_consume(self):
        self.channel.start_consuming()


if __name__ == '__main__':
    fibonaci = FibonaciServer()
    fibonaci.call_back()
    fibonaci.start_consume()

執行資訊:


===>

D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/producer.py"
....開始傳送訊息....==>3
傳送訊息Successful...
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
RPC 正在獲取RPC響應結果:...
相關屬性資訊:
<BasicProperties(['correlation_id=a86099bf-eff3-4835-989f-a20c2e240188', 'delivery_mode=2'])>
獲取到的執行結果是:b'2'

Process finished with exit code 0

===>

D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/consumer.py"
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=a86099bf-eff3-4835-989f-a20c2e240188', 'delivery_mode=2', 'reply_to=amq.gen-T84YRrsaktV_HWQdjqOWgw'])>
原值:  b'3' 斐波那契的執行結果:  2
將計算的執行結果返回給RPC客戶端....
----------------------------------------

OK,分解之後是不是容易理解多了.