1. 程式人生 > >python 64式: 第1式 編寫rpc的call和cast

python 64式: 第1式 編寫rpc的call和cast

myclient.py內容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : myclient.py
# @Software: PyCharm

'''

參考文章:
http://blog.csdn.net/happyanger6/article/details/54777429
https://docs.openstack.org/oslo.messaging/ocata/rpcclient.html

關鍵:
1 RPC Client基礎
oslo_messaging.RPCClient(transport, target, timeout=None,, version_cap=None, serializer=None, retry=None)
作用:呼叫遠端RPC服務的類
引數: transport: rpc底層通訊,支援rabbit,qpid
     target: 訊息最終傳送的地方
            形式:target(exchange,topic,server):
            引數:exchange:topic屬於的範圍,預設為配置檔案的control_exchange
                topic: 可被遠端呼叫的多個方法
                server:訊息目的地伺服器
2 RPC Call方法
oslo_messaging.RPCClient.call(ctxt, method, **kwargs)
含義:call方法是遠端過程呼叫帶有返回值的方法
特點:阻塞直到服務端返回結果
引數: ctxt:上下文字典,可為空
     method:被呼叫的方法名
     kwargs:呼叫該方法傳入的引數
返回值:返回被呼叫遠端方法的結果

3 RPC Cast方法
oslo_messaging.RPCClient.cast(ctxt, method, **kwargs)
含義:cast方法是遠端過程呼叫,不帶有返回值的方法
引數: ctxt:上下文字典,可為空
     method:被呼叫的方法名
     kwargs:呼叫該方法傳入的引數
返回值:無

4 RPC prepare方法
oslo_messaging.RPCClinet.prepare(exchange, topic, namespace, version, server)
作用:就是覆蓋RPCClient初始化的一些屬性


5 RPC總結:
實際就是一個遠端呼叫,客戶端通過rpc呼叫服務端的方法。
通過指定方法名和對應引數來進行呼叫;


RPCClient端呼叫過程:
1) 通過配置檔案獲取transport,指定topic初始化target,
2) 用transport和target初始化RPCClient
3) 最後呼叫RPCClientr的call方法或者cast方法

RPCServer端呼叫過程:
1) 根據配置初始化transport,根據topic,server等初始化target
2) 指定endpoints陣列,每個元素是一個類例項
3) 用transport,target,endpoints,executor來構造rpc server
4) 執行rpc server的start()方法
注意: 客戶端和服務端通過target進行連線,都需要指明topic引數。
'''

from oslo_config import cfg
import oslo_messaging as messaging


def rpcCall():
    # 訊息中介軟體,rabbitMQ或者Qpid等
    transport = messaging.get_transport(cfg.CONF)

    target = messaging.Target(topic='test')
    client = messaging.RPCClient(transport, target)
    ret = client.call(ctxt={},
                      method='processCall',
                      name='hello rpc call')
    print "######## End call, result: %s" %  ret

def rpcCast():
    transport = messaging.get_transport(cfg.CONF)
    target = messaging.Target(topic='test')
    client = messaging.RPCClient(transport, target)
    cctx = client.prepare(namespace='control', version='2.0')
    cctx.cast({}, 'processCast', name='hello rpc cast')
    print "######## End cast"

def process():
    rpcCall()
    rpcCast()

if __name__ == "__main__":
    process()

myserver.py內容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : myserver.py
# @Software: PyCharm

from oslo_config import cfg
import oslo_messaging as messaging


'''
參考:
https://docs.openstack.org/ironic/latest/_modules/oslo_messaging/rpc/server.html

關鍵:
1 總結初始化RPC Server步驟:
1) 根據配置初始化transport,根據topic,server等初始化Target
2) 指定endpoints陣列,每個元素是一個類例項
3) 用transport,target,endpoints,executor來構造rpc server
4) 執行rpc server的start()方法

2 get_rpc_server方法分析
    oslo_messaging.get_rpc_server(transport, target, endpoints, executor='blocking')
    作用:構建一個RPC伺服器
    引數:
        transport: rpc通訊,支援rabbit,qpid;可通過oslo.messaging.get_transport來獲得transport物件例項的控制代碼
                   從cfg物件中讀取transport_url,rpc_backend,control_exchange資訊來構造Transport物件,
                   其中rpc_backend預設值: rabbit; control_exchange預設值:openstack
        target: 封裝訊息目的地所有資訊,包含exchange, topic, server, namespace, version等資訊
                
        endpoints: 埠陣列,每個元素是類物件
        executor:訊息執行器,可選:eventlet和threading。決定訊息如何收到和分發
                eventlet:
                threading:協程處理訊息接收,不阻塞
                blocking:阻塞,直到呼叫stop()退出
        serializer:可選的實體序列化器

3 RPCServer和RPCClient中關於target部分注意點
注意:每個endpoint可能會有一個target屬性,有namespace和version欄位
預設是 null namespace和1.0版本,用來相容不同的topic建立的Target構造的RPCClient
呼叫,樣例如下。
class RpcCastEndpoint(object):
    target = messaging.Target(namespace='control',version='2.0')
因為有的RPC Client是用下面方式建立:
    client = messaging.RPCClient(transport, target)
    cctx = client.prepare(namespace='control', version='2.0')
所以兩邊要匹配
'''

class RpcCastEndpoint(object):
    '''
    注意:每個endpoint可能會有一個target屬性,有namespace和version欄位
    預設是 null namespace和1.0版本,用來相容不同的topic建立的Target構造的RPCClient
    呼叫
    '''
    target = messaging.Target(namespace='control',version='2.0')

    def __init__(self):
        pass

    def processCast(self, ctx, name):
        # print "####### RpcCastEndpoint.processCast##########"
        print name


class RpcCallEndpoint(object):
    def __init__(self):
        pass

    def processCall(self, ctx, name):
        # print "####### RpcCallEndpoint.processCall"
        print name
        return name


def startRpcServer():
    transport = messaging.get_transport(cfg.CONF)
    target = messaging.Target(topic='test',
                              server='server1')
    endpoints = [
        RpcCastEndpoint(),
        RpcCallEndpoint(),
    ]
    server = messaging.get_rpc_server(transport, target, endpoints,
                                      executor='blocking')
    server.start()
    server.wait()


def process():
    startRpcServer()

if __name__ == "__main__":
    process()

先執行:

python myserver.py

然後執行

python myclient.py

即可顯示最終結果