自定義RPC的完整實現---深入理解rpc內部原理
阿新 • • 發佈:2018-10-28
channel struct seek raise services utf-8 proto encode res 倘若不使用RPC遠端調用的情況下,代碼如下:
local.py
# coding:utf-8 # 本地調用除法運算的形式 class InvalidOperation(Exception): def __init__(self, message = None): self.message = message or ‘involid operation‘ def divide(num1, num2 = 1): if num2 == 0: raise InvalidOperation res = num1 / num2 return res try: val = divide(200, 100) except InvalidOperation as e: print(e.message) else: print(val)
接下來將使用RPC二進制的形式,遠程過程調用上述代碼。
service.py 中自定義需要實現消息協議、傳輸控制,並且實現客戶端存根clientStub和服務器端存根serverStub,服務器定義以及channel的定義。
import struct from io import BytesIO import socket class InvalidOperation(BaseException): def __init__(self, message = None): self.message = message or ‘involid operation‘ class MethodProtocol(object): ‘‘‘‘ 解讀方法名 ‘‘‘ def __init__(self, connection): self.conn = connection def _read_all(self, size): """ 幫助我們讀取二進制數據 :param size: 想要讀取的二進制數據大小 :return: 二進制數據bytes """ # self.conn if isinstance(self.conn, BytesIO): buff = self.conn.read(size) return buff else: # 有時候長度大於每次讀取的長度 have = 0 buff = b‘‘ while have < size: chunk = self.conn.recv(size - have) buff += chunk l = len(chunk) have += l if l == 0: # 表示客戶端已經關閉了 raise EOFError return buff def get_method_name(self): # 讀取字符串長度 buff = self._read_all(4) length = struct.unpack(‘!I‘,buff)[0] # 讀取字符串 buff = self._read_all(length) name = buff.decode() return name class DivideProtocol(object): """ divide過程消息協議轉換工具 """ def args_encode(self, num1, num2=1): """ 將原始調用的請求參數轉換打包成二進制消息數據 :param num1: int :param num2: int :return: bytes 二進制消息數據 """ name = ‘divide‘ # 處理函數名 buff = struct.pack(‘!I‘, 6) # 無符號int buff += name.encode() # 處理參數1 buff2 = struct.pack(‘!B‘, 1) # 無符號byte buff2 += struct.pack(‘!i‘, num1) # 處理參數2 if num2 != 1: # 沒有傳參的時候 buff2 += struct.pack(‘!B‘, 2) buff2 += struct.pack(‘!i‘, num2) # 處理參數邊界和組合成完整數據 buff += struct.pack(‘!I‘,len(buff2)) buff += buff2 return buff def _read_all(self, size): """ 幫助我們讀取二進制數據 :param size: 想要讀取的二進制數據大小 :return: 二進制數據bytes """ # self.conn if isinstance(self.conn, BytesIO): buff = self.conn.read(size) return buff else: # 有時候長度大於每次讀取的長度 have = 0 buff = b‘‘ while have < size: chunk = self.conn.recv(size - have) buff += chunk l = len(chunk) have += l if l == 0: # 表示客戶端已經關閉了 raise EOFError return buff def args_decode(self, connection): """ 接受調用請求數據病進行解析 :param connection: 鏈接請求數據 socket BytesIO :return: 因為有多個參數,定義為字典 """ param_len_map = { 1:4, 2:4, } param_fmt_map = { 1:‘!i‘, 2:‘!i‘, } param_name_map = { 1: ‘num1‘, 2: ‘num2‘, } # 保存用來返回的參數字典 args = {} self.conn = connection # 處理方法的名字,已經提前被處理,稍後處理 # 處理消息邊界 # 1) 讀取二進制數據----read , ------ByteIO.read # 2) 將二進制數據轉換為python的數據類型 buff = self._read_all(4) length = struct.unpack(‘!I‘,buff)[0] # 記錄已經讀取的長度值 have = 0 # 處理第一個參數 # 解析參數序號 buff = self._read_all(1) have += 1 param_seq = struct.unpack(‘!B‘, buff)[0] # 解析參數值 param_len = param_len_map[param_seq] buff = self._read_all(param_len) have += param_len param_fmt = param_fmt_map[param_seq] param = struct.unpack(param_fmt,buff)[0] # 設置解析後的字典 param_name = param_name_map[param_seq] args[param_name] = param if have >= length: return args # 處理第二個參數 # 解析參數序號 buff = self._read_all(1) param_seq = struct.unpack(‘!B‘, buff)[0] # 解析參數值 param_len = param_len_map[param_seq] buff = self._read_all(param_len) param_fmt = param_fmt_map[param_seq] param = struct.unpack(param_fmt, buff)[0] # 設置解析後的字典 param_name = param_name_map[param_seq] args[param_name] = param return args def result_encode(self, result): """ 將原始結果數據轉換為消息協議二進制數據 :param result: :return: """ if isinstance(result,float): # 處理返回值類型 buff = struct.pack(‘!B‘, 1) buff += struct.pack(‘!f‘, result) return buff else: buff = struct.pack(‘!B‘, 2) # 處理返回值 length = len(result.message) # 處理字符串長度 buff += struct.pack(‘!I‘, length) buff += result.message.encode() return buff def result_decode(self, connection): """ 將返回值消息數據轉換為原始返回值 :param connection: socket BytesIo :return: float InvalidOperation對象 """ self.conn = connection # 處理返回值類型 buff = self._read_all(1) result_type = struct.unpack(‘!B‘, buff)[0] if result_type == 1: #正常情況 buff = self._read_all(4) val = struct.unpack(‘!f‘, buff)[0] return val else: buff = self._read_all(4) length = struct.unpack(‘!I‘, buff)[0] # 讀取字符串 buff = self._read_all(length) message = buff.decode(buff) return InvalidOperation(message) class Channel(object): """ 用於客戶端建立網絡鏈接 """ def __init__(self, host, port): self.host = host self.port = port def get_connection(self): """ 獲取鏈接對象 :return: 與服務器通訊的socket """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.host, self.port)) return sock class Server(object): """ RPC服務器 """ def __init__(self, host, port, handlers): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 地址復用 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = host self.port = port # 綁定地址 sock.bind((self.host, self.port)) # 因為在啟動的方法中才開啟監聽,所以不在此處開啟 # sock.listen(128) self.sock = sock self.handlers = handlers def serve(self): """ 開啟服務器運行,提供RPC服務 :return: """ # 開啟服務器的監聽,等待客戶端的鏈接請求 self.sock.listen(128) print("服務器開啟監聽,ip地址為%s,port為%d..." % (self.host,self.port)) while True: # 不斷的接收客戶端的鏈接請求 client_sock, client_addr = self.sock.accept() print("與客戶端%s建立連接" % str(client_addr)) # 交個ServerStub,完成客戶端的具體的RPC的調用請求 stub = ServerStub(client_sock, self.handlers) try: while True: # 不斷的接收 stub.process() except EOFError: # 表示客戶端關閉了連接 print(‘客戶端關閉了連接‘) client_sock.close() class ClientStub(object): """ 用來幫助客戶端完成遠程過程調用 RPC調用 stub = ClientStub() stub.divide(200, 100) """ def __init__(self, channel): self.channel = channel self.conn = self.channel.get_connection() def divide(self, num1, num2 = 1): # 將調用的參數打包成消息協議的數據 proto = DivideProtocol() args = proto.args_encode(num1, num2) # 將消息數據通過網絡發送給服務器 self.conn.sendall(args) # 接受服務器返回的消息數據,並進行解析 result = proto.result_decode(self.conn) # 將結果之(正常float 或 異常InvalidOperation)返回給客戶端 if isinstance(result,float): return result else: raise result class ServerStub(object): """ 服務端存根 幫助服務端完成遠端過程調用 """ def __init__(self, connection, handlers): """ :param connection: 與客戶端的鏈接 :param handlers: 真正的本地函數路由 此處不以map的形式處理,實現類的形式 class Handler: @staticmethod def divide(): pass @staticmethod def add(): pass """ self.conn = connection self.method_proto = MethodProtocol(self.conn) self.process_map = { ‘divide‘: self._process_divide, ‘add‘: self._process_add } self.handlers = handlers def process(self): """ 當服務端接受了客戶的鏈接,建立好鏈接後,完成遠端調用的處理 :return: """ # 接收消息數據,並解析方法的名字 name = self.method_proto.get_method_name() # 根據解析獲得的方法名,調用相應的過程協議,接收並解析消息數據 self.process_map[name]() def _process_divide(self): """ 處理除法過程調用 :return: """ proto = DivideProtocol() args = proto.args_decode(self.conn) # args = {‘num1‘:xxx, ‘num2‘:xxx} # 除法過程的本地調用------------------->>>>>>>>> # 將本地調用過程的返回值(包括可能的異常)打包成消息協議的數據,通過網絡返回給客戶端 try: val = self.handlers.divide(**args) except InvalidOperation as e: ret_message = proto.result_encode(e) else: ret_message = proto.result_encode(val) self.conn.sendall(ret_message) def _process_add(self): """ 處理加法過程調用 此方法暫時不識閑 :return: """ pass if __name__ == ‘__main__‘: # 目的:消息協議測試,模擬網絡傳輸 # 構造消息數據 proto = DivideProtocol() # 測試一 # divide(200,100) # message = proto.args_encode(200,100) # 測試二 message = proto.args_encode(200) conn = BytesIO() conn.write(message) conn.seek(0) # 解析消息數據 method_proto = MethodProtocal(conn) name = method_proto.get_method_name() print(name) args = proto.args_decode(conn) print(args)
接下來,只需要創建服務器實例和使用客戶端發起請求
server.py
from services import InvalidOperation from services import Server class Handlers: @staticmethod def divide(num1, num2 = 1): if num2 == 0: raise InvalidOperation(‘ck_god_err‘) val = num1/num2 return val if __name__ == ‘__main__‘: # 開啟服務器 _server = Server(‘127.0.0.1‘, 8000, Handlers) _server.serve()
client.py
ffrom services import ClientStub
from services import Channel
from services import InvalidOperation
# 創建與服務器的連接
channel = Channel(‘127.0.0.1‘, 8000)
# 創建用於rpc調用的工具
stub = ClientStub(channel)
# 進行調用
for i in range(5):
try:
# val = stub.divide(i * 100,100)
# val = stub.divide(i * 100)
val = stub.divide( 100, 0)
except InvalidOperation as e:
print(e.message)
else:
print(val)
自定義RPC的完整實現---深入理解rpc內部原理