1. 程式人生 > >python系列之 RabbitMQ

python系列之 RabbitMQ

遠端過程呼叫(Remote procedure call (RPC))

在第二課我們學習了怎樣使用 工作佇列(work queues) 來在多個workers之間分發需要消時的 任務

但是如果我們需要在遠端的伺服器上呼叫一個函式並獲取返回結果 我們需要怎麼做呢?well這是一個不一樣的故事。 這中模式通常被稱為遠端過程呼叫或RPC

在這一刻我們將要使用RabbitMQ來建立一個RPC系統:一個客戶端和一個可擴充套件的RPC服務。由於我們沒有任何耗時的任務值得分配,我們將要建立一個仿RPC服務並返回斐波納契數值

客戶端介面(Client interface)

為了闡明RPC服務怎麼使用我們建立一個簡單的客戶端類,將用一個名為Call的方法傳送一個RPC請求並阻塞直到獲取結果:
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)


回撥佇列(callback queue)

一般通過RabbitMQ執行RPC是很容易的。 一個客戶端傳送一個請求訊息然後服務端返回一個訊息作為應答。 為了接收返回訊息客戶端需要傳送一個“callback" 佇列請求地址,讓我們試試:
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

訊息屬性 AMQP協議在一個訊息中預先定義了一個包含14個屬性的集合。大部分屬性很少用到,以下幾種除外: > delivery_mode: 標記一個訊息為持久的(值為2)或者 瞬時的(其它值), 你需要記住這個屬性(在第二課時用到過) > content_type : 用來描述 MIME 型別的編碼 ,比如我們經常使用的 JSON 編碼,設定這個屬性就非常好實現: application/json > reply_to: 經常用來命名一個 callback 佇列 > correlation_id : 用來關聯RPC的請求與應答

關聯ID (Correlation ID)

前面提到的方法我們建議為每個RPC請求建立一個callback佇列。  那是相當低效的,但是幸好有一個更好的方法 -- 我們未每個客戶端建立一個單獨的callback佇列。 但這帶來了一個新的問題, 當在那個佇列中接收了一個返回,我們並不清楚是這個結果時屬於那個請求的,這樣當correlation_id屬性使用後,我們為每個請求設定一個唯一值。然後當我們從callback佇列中接收到一個訊息後,我們檢視一下這個屬性,基於這個我們就能將請求和返回進行匹配。如果我們看到一個未知的correlation_id值,我們可以安全的丟棄這個訊息 -- 不屬於我們的請求 你可能會問,為什麼我們要在callback佇列中忽略未知的訊息,而不是通過這個錯誤執行失敗? 這是由於服務端的競爭條件的可能性(??),雖然可能性不大,但在為請求傳送ack訊息之前,當傳送給我們結果後RPC服務還是有死掉的可能。如果發生這樣的情況,讓重啟RPC服務之後將會重新處理請求。 這就是為什麼客戶端必須妥善的處理重複響應。

概要(Summary)


我們的RPC將會這樣執行: >  當客戶端啟動後,它建立一個匿名的唯一的回撥佇列 > 對一個RPC請求, 客戶端傳送一個訊息包含兩個屬性: reply_to (用來設定回撥佇列)和 correlation_id(用來為每個請求設定一個唯一標識) > 請求傳送到 rpc_queue佇列 > RPC worker( 服務端) 在那個佇列中等待請求,當一個請求出現後,服務端就執行一個job並將結果訊息傳送給客戶端,使用reply_to欄位中的佇列 > 客戶端在callback 佇列中等待資料, 當一個訊息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用

整合

rpc_server.py程式碼
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()


服務端程式碼詳單簡單: > (4) 和往常一樣我們建立一個連線並定義一個佇列 > (11) 我們定義了  斐波納契 函式,假定輸入的都是合法正數 > (19) 我們定義了一個回撥的 basic_consume, RPC服務的核心。 當收到請求後執行這個函式並返回結果 > (32) 我們可能會執行多個服務端,為了在多個服務端上均勻的分佈負荷,我們需要這是 prefetch_count。 rpc_client.py 程式碼:
#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)


客戶端程式碼稍微複雜些: > (7) 我們建立一個連線,通道並定義一個專門的’callback‘佇列用來接收回復 > (16) 我們訂閱了“callback”佇列,因此我們能夠接收 RPC 的返回結果 > (18) ’on_response'  在每個返回中執行的回撥是一個簡單的job, 對每個返回訊息將檢查是否correlation_id使我們需要查詢的那個ID,如果是,將儲存結果到 self.response 並終端consuming迴圈 > (23) 下一步,我們定義我們的main方法 - 執行實際的RPC請求 > (24) 在這方法中,首先我們生產一個唯一的 correlatin_id 號並儲存 -- 'on_response"回撥函式將用著號碼來匹配發送和接收的訊息值 > (25) 下一步,釋出請求資訊,使用兩個屬性: reply_to 和 correlation_id > (32) 這一步我們可以坐等結果的返回 >(33) 最後我們返回結果給使用者 我們的RPC服務現在已經就緒,可以開啟服務:
$ python rpc_server.py
 [x] Awaiting RPC requests
請求一個斐波那契數,執行客戶端
$ python rpc_client.py
 [x] Requesting fib(30)



相關推薦

python系列 RabbitMQ

遠端過程呼叫(Remote procedure call (RPC)) 在第二課我們學習了怎樣使用 工作佇列(work queues) 來在多個workers之間分發需要消時的 任務 但是如果我們需要在遠端的伺服器上呼叫一個函式並獲取返回結果 我們需要怎麼做呢?well這是

小白程序要進階系列RabbitMQ-前言

小白 部分 持久化 程序 ruby spa pan 導致 寫入 什麽是RabbitMQ RabbitMQ是一個開源的AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)實現,是可復用的企業級消息系統。服務器端使用Erlang語言編

Centos7 minimal 系列rabbitmq安裝(八)

ref 需要 查看 del kcon start 設置 mis nsh 一、安裝Erlang   由於RabbitMQ依賴Erlang, 所以需要先安裝Erlang。   這種方法網站訪問不了 wget https://packages.erlang-solutions.

Python系列入門篇——HDFS

system rec urn cep gpo user raise ret append Python系列之入門篇——HDFS 簡介 HDFS (Hadoop Distributed File System) Hadoop分布式文件系統,具有高容錯性,適合部署在廉價的機器上

Python系列 - 反射

function 一道 好的 HA BE python n) method lis 一、靜態方法(staticmethod)和類方法(classmethod) 類方法:有個默認參數cls,並且可以直接用類名去調用,可以與類屬性交互(也就是可以使用類屬性) 靜態方法:讓類

Python系列反射、面向對象

exp post 用戶輸入 裝飾器 場景 comment ebe alt 新的 一、反射 說反射之前先介紹一下__import__方法,這個和import導入模塊的另一種方式 1. import commons 2. __import__(‘commons‘)

python系列

I/O多路複用是在單執行緒模式下實現多執行緒的效果,實現一個多I/O併發的效果。看一個簡單socket例子: import socket SOCKET_FAMILY = socket.AF_INET SOCKET_TYPE = socket.SOCK_STREAM s

Python系列程序池與執行緒池

在剛開始學多程序或多執行緒時,我們迫不及待地基於多程序或多執行緒實現併發的套接字通訊,然而這種實現方式的致命缺陷是:服務的開啟的程序數或執行緒數都會隨著併發的客戶端數目地增多而增多,這會對服務端主

Python系列 - 入門基礎知識筆記

馮諾依曼架構     控制器     運算器     儲存器     輸入裝置 (I)     輸出裝置 (O)   &n

Python系列 - 怎麼用urllib.request模組下載hao123html主頁

Python系列之 - 怎麼用urllib.request模組下載hao123html主頁 下載hao123 html主頁程式碼片段如下: import urllib.request def getHtml(url): html = urllib.request.

Python系列 - Mac安裝python3.x版本

Mac安裝python3.x版本 本文給出了python3.x版本在Mac上安裝的詳細步驟。 Mac自帶Python,有些軟體的執行可能需要用到2.X的版本,而3的版本與2的版本區別甚大,作為python 開發還是需要安裝最新的3.x版本 1. 檢視本機預設安裝環境 通過una

Python系列迴圈定時器

近期在學習並使用Python開發一些小工具,在這裡記錄方便回憶,也與各位開始走上這條路的朋友共勉,如有不正確希望指正,謝謝! 開始使用定時器時,度娘了下有沒好的例子,本人比較懶,希望能直接使用。確實找到了一些,但是大多隻是很直白的程式碼,自己打算整理一下。 我選用了thr

Python系列8socket

art keyword http tps dal input port pytho ima 目錄   socket     簡單的聊天機器人   簡單的ftp上傳   粘包問題的解決 一. socket模塊   socket,俗稱套接字,其實就是一個ip地址和端口的組合。類

Python項目實戰:福布斯系列數據采集

sce nmp mgr 上市 sts nor 頁面數據 都差不多 afa 1 數據采集概述 開始一個數據分析項目,首先需要做的就是get到原始數據,獲得原始數據的方法有多種途徑。比如: 獲取數據集(dataset)文件 使用爬蟲采集數據 直接獲得excel、

Python零基礎學習系列二--Python介紹及環境搭建

url 軟件包 三方庫 簡單的 lin 文件的 span 高級程序設計 擴展 1-1、Python簡介:  Python是一種解釋型、面向對象、動態數據類型的高級程序設計語言。Python由Guido van Rossum於1989年底發明,第一個公開發行版發行於1991年

Python collections系列雙向隊列

property getitem eps color signature integer get deque method 雙向隊列(deque) 一個線程安全的雙向隊列 1、創建一個雙向隊列 import collections d = collections.d

Python collections系列可命名元組

!= nta method first not bsp dex data tin 可命名元組(namedtuple) 根據nametuple可以創建一個包含tuple所有功能以及其他功能的類 1、創建一個坐標類 import collections # 創建類, d

Python全棧系列---------面向對象4接口與抽象,多繼承與多態)

統一 dog blog 水果 創建 設計 概念 fly 支付 接口類與抽像類 在python中,並沒有接口類這種東西,即便不通過專門的模塊定義接口,我們也應該有一些基本的概念 編程思想 歸一化設計:  1.接口類 不實現具體的方法,並且可以多繼承  2.抽象類 可以做一

Appium python自動化測試系列Capability介紹(五)

語言 路徑 pla apk 過程 5.1 基礎 針對 driver ?5.1 Capability介紹 5.1.1 什麽是Capability 在講capability之前大家是否還記得在講log時給大家看過的啟動時的日誌?在我們的整個啟動日誌中會出現一些配置信息,其實那些

Appium python自動化測試系列元素的定位(六)

keditor logs ren 匹配 4.4 contex 應用 需要 運用 ?6.1 常用定位方法講解 對象定位是自動化測試中很關鍵的一步,也可以說是最關鍵的一步,畢竟你對象都沒定位那麽你想操作也不行。所以本章節的知識我希望大家多動手去操作,不要僅僅只是書本上的知識,畢