1. 程式人生 > >使用python通過protobuf實現rpc

使用python通過protobuf實現rpc

網上有很多教程,基本都是c++的,很多還解釋的不夠清楚,新手沒辦法通過文章自己實現一個完整的rpc,而且很少有python的完整教程,
所以我從頭用一個完整的echo server demo來講解protobuf rpc的基本原理。


protobuf rpc echo demo 原始碼:

github: https://github.com/majianfei/protobuf-rpc-echo

不瞭解protobuf怎麼安裝及protobuf的基礎語法,可以網上檢視教程。我就不多說了。


首先 protobuf rpc幫我們做了什麼:
google protobuf只負責訊息的打包和解包,並不包含RPC的實現,但其包含了RPC的定義。也不包括通訊機制。
所以我們要自己實現通訊機制。


首先我們定義:game_service.proto:


option py_generic_services = true;

message RequestMessage
{
    required string msg = 1;
}

message ResponseMessage
{
    required string msg = 1;
}

service IEchoService
{
    rpc echo(RequestMessage) returns(ResponseMessage);
}

使用 proto --cpp_out=. game_service.proto
會生成 game_service_pb2.py的檔案,裡面會有兩個類,一個 IEchoService,一個IEchoService_stub繼承自IEchoService
這兩個有什麼區別的,一般來說:IEchoService_stub,作為呼叫方,IEchoService作為被呼叫方。(假設我們做單向通訊,客戶端->伺服器)
當呼叫方呼叫echo時,自動通過protobuf rpc傳輸到被呼叫方,呼叫被呼叫方的echo處理邏輯。


過程是這樣的:
1.被呼叫方主動呼叫echo傳送message資料。
2.protobuf rpc會自動呼叫rpc_channel的CallMethod方法。
3.CallMethod呼叫具體的通訊過程傳送資料。(需要我們實現CallMethod和通訊過程)
注:通訊過程我們要自己實現,這也是protobuf設計的初衷,在最多變的部分(多種多樣的網路結構、協議和通訊機制)留出足夠的空間讓程式設計師可以針對特定場景自己實現,使得protobuf可以應用在更多的場景。
4.被呼叫方讀到資料
5.資料通過protobuf的定義解析成message格式(自己實現)
6.找到指定IEchoService的echo
7.我們在echo裡面實現我們的邏輯就好了。


一般而言,會把rpc的客戶端->伺服器->客戶端變成兩個單獨的過程。
option py_generic_services = true;

message Void {}

message RequestMessage
{
    required string msg = 1;
}

message ResponseMessage
{
    required string msg = 1;
}

//客戶端發給伺服器
service IEchoService
{
    rpc echo(RequestMessage) returns(Void);
}

//伺服器發給客戶端
service IEchoClient
{
    rpc echo_reply(ResponseMessage) returns(Void);
}


實現通訊層
最簡單的,我們使用python的asyncore來實現一個簡單的通訊層。
包括 TcpConnection,TcpServer,TcpClient,這應該很簡單就能理解,具體可以看github上的原始碼。

RPC客戶端需要實現google.protobuf.RpcChannel。主要實現RpcChannel.CallMethod介面。客戶端呼叫任何一個RPC介面,最終都是呼叫到CallMethod。這個函式的典型實現就是將RPC呼叫引數序列化,然後投遞給網路模組進行傳送。

def CallMethod(self, method_descriptor, rpc_controller,
			 request, response_class, done):
	index = method_descriptor.index
	data = request.SerializeToString()
	total_len = len(data) + 6
	self.logger.debug("CallMethod:%d,%d"%(total_len,index))
	
	self.conn.send_data(''.join([struct.pack('!ih', total_len, index), data]))


無論是呼叫端還是被呼叫端,一個method_descriptor在其所在Service內的index是一致的。因此method_descriptor的部分只需要對其index進行序列化即可。
RPC呼叫的引數可以直接使用protobuf的SerializeToString()方法進行序列化,進而在接收端通過ParseFromString()方法反序列化。


protobuf的service API在被呼叫端為我們完成的工作是,當使用合適的method_descriptor和request引數呼叫IEchoService.CallMethod()時,會自動呼叫我們對相應方法介面的具體實現。因此在服務端需要做的工作主要由:


接受呼叫端發來的資料。
對接收到的資料包進行反序列化,解析得到method_descriptor和request引數。
呼叫EchoService.CallMethod()。

def input_data(self, data):
	total_len, index = struct.unpack('!ih', data[0:6])
	self.logger.debug("input_data:%d,%d" % (total_len, index))
	rpc_service = self.rpc_service
	s_descriptor = rpc_service.GetDescriptor()
	method = s_descriptor.methods[index]
	try:
		request = rpc_service.GetRequestClass(method)()
		serialized = data[6:total_len]
		request.ParseFromString(serialized)
		
		rpc_service.CallMethod(method, self.rpc_controller, request, None)
		
	except Exception, e:
		self.logger.error("Call rpc method failed!")
		print "error:",e
		self.logger.log_last_except()
	return True


服務端實現RPC介面,繼承自IEchoService。

# 被呼叫方的Service要自己實現具體的rpc處理邏輯
class MyEchoService(IEchoService):
	
	def echo(self, controller, request, done):
		
		rpc_channel = controller.rpc_channel
		msg = request.msg
		
		response = ResponseMessage()
		response.msg = "echo:"+msg
		
		print "response.msg", response.msg
		
		# 此時,伺服器是呼叫方,就呼叫stub.rpc,客戶端時被呼叫方,實現rpc方法。
		client_stub = IEchoClient_Stub(rpc_channel)
		client_stub.echo_reply(controller, response, None)

整個流程就清晰了,
1.實現RpcChannel,主要實現CallMethod使用底層通訊機制(TcpConnection) 把序列化後的資料傳送出去,當服務端TcpConnection收到包之後,反序列化(input_data)


我們要繼承 EchoService 並實現echo,當客戶端呼叫EchoService_stub.echo時,會呼叫RpcChannel的CallMethod傳送出去,然後接收方TpcConnection.hand_read呼叫rpc_channel的input_data,我們實現反序列化過程,再呼叫EchoService
的CallMethod,protobuf會自動呼叫我們實現的echo方法,然後伺服器回訊息到客戶端時,呼叫方跟被呼叫方就反過來了。我們呼叫方(伺服器)使用stub.echo_reply走原來的流程,到客戶端(被呼叫方),實現echo_reply處理邏輯。


有興趣可以看看github上的原始碼。
更有興趣的可以使用protobuf rpc實現一個完整的rpc server,我個人暫時就不完善了。


我目前在實現分散式伺服器架構,底層使用libev網路庫,然後使用c++封裝buffer資料解析,然後邏輯執行緒使用python實現,c++把tcp stream資料解析成完整的packet之後丟到message_queue然後邏輯執行緒讀到資料丟給python處理。使用protobuf rpc。
目前還在開發階段。歡迎有興趣的來討論。

相關推薦

使用python通過protobuf實現rpc

網上有很多教程,基本都是c++的,很多還解釋的不夠清楚,新手沒辦法通過文章自己實現一個完整的rpc,而且很少有python的完整教程, 所以我從頭用一個完整的echo server demo來講解protobuf rpc的基本原理。 protobuf rpc echo

寫一個Python通過select實現的最簡單的web框架

127.0.0.1 log put lec func select odi block pos 1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import socket 5 import selec

Python-通過socket實現一個小型的端口檢測工具

socket 結果 soc png cep light true python bsp 實驗機器IP:192.168.220.139,端口開放情況 代碼 # -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import

Java通過Hadoop實現RPC通訊簡單例項

一、定義server端程式碼 1.定義一個介面,該介面繼承org.apache.hadoop.ipc.VersionedProtocol介面 import org.apache.hadoop.ipc.VersionedProtocol; /** * 1.伺服器定義介面

[reportlab教程4]——python通過reportlab實現圖片轉換pdf

#!/usr/bin/env python import os import sys from reportlab.lib.pagesizes import A4, landscape from reportlab.pdfgen import canvas f = sys

python通過zlib實現壓縮檔案內容(str),和解壓縮還原檔案內容

#!/usr/bin/env.python # -*- coding: utf-8 -*- import zlib ''' 壓縮infile檔案內容,寫入dst檔案中 ''' def compres

Python通過win32實現office自動化

''' 以下一段是增加10個新頁,然後跳轉到新頁中增加內容。。。。 ''' section_index = 0 for i in range(0, 10):  #由於增加頁的使用如此頻繁,我們最好將其提取為一個函式,類似def NewTable(self):  pre_section = doc.Secito

python通過multiprocessing 實現帶回調函數的異步調用的代碼

import proc sin 實現 __name__ lin roc 回調 ron 下邊代碼段是關於python通過multiprocessing 實現帶回調函數的異步調用的代碼。 from multiprocessing import Pool def f(x):

Python通過Socket實現檔案傳輸

客戶端# -*- coding: utf-8 -*- import socket import argparse import os import hashlib import pickle def parse_args(): parser = argparse.A

Python-RabbitMQ消息隊列實現rpc

llb author bject roc read uuid tin rip rabbit 客戶端通過發送命令來調用服務端的某些服務,服務端把結果再返回給客戶端 這樣使得RabbitMQ的消息發送端和接收端都能發送消息 返回結果的時候需要指定另一個隊列 服務器端 # -

五、通過Protobuf整合Netty實現對協議訊息客戶端與伺服器通訊實戰

目錄 一、Protocol Buffers 是什麼? 二、Protocol Buffers 檔案和訊息詳解 三、專案實戰,直接掌握的protobuf應用。 一、Protocol Buffers 是什麼?         1、官網翻譯

python 通過多執行緒實現Excel 批量更新商品價格

import openpyxl import threading wb = openpyxl.load_workbook('produceSales.xlsx') sheet = wb.active all_info = [] for row in sheet.rows: child

Python 通過打碼平臺實現驗證碼

  在爬蟲時,經常遇到登入需要驗證碼的情況,簡單的驗證碼可以自己解決,複製的驗證碼需要藉助機器學習,有一定的難度。還有一個簡單的方案就是採用付費的打碼平臺。   比如R若快(http://www.ruokuai.com/client),還有云打碼平臺(http://www.yundama.com/price

python通過HOG+SVM實現行人檢測思路

一、思路 1、選取視窗寬高為 64*128 ,block大小為 16*16畫素,block步長為8畫素,cell為8*8畫素,每個cell分9個bin,其他引數都預設         這樣的話,一個block有4個cell,一個cell有9維,那一

python : 通過socket的Tcp/ip協議實現客戶端與服務端的資料互動

#服務端 繫結socket 繫結目的ip和埠 建立監聽 建立連線 資料互動 import socket ''' #建立socket :通過server接收 #繫結目的ip和埠號 #設定監聽 #建立連線Socket,Address接收資訊 #資料互

Python 通過微信控制實現app定位傳送到個人伺服器,再轉發微信伺服器接收位置資訊。

考慮到女友的安全問題,就做了一個app實現定位和伺服器實現轉發的東西。剛學python,竟沒想到用物件程式設計會更加方便,全程過程式開發,程式碼有點臃腫,就當學習下python吧.效果就是:在微信公眾號中輸入指定字元比如:”我要知道你的位置”,手機那端的位置就彈出來了.主要是講一下思路:先是

[原創]python通過modbus_tk實現modbus主機上位機

如果你的開發環境是python3及以上 pip3 install modbus_tk 否則使用 pip install modbus_tk 直接貼Python程式碼 系統:win10 IDE:pycharm + Qtdesigner 目標:實現簡單modbus通訊 import

Python通過Manager方式實現多個無關聯程序共享資料

Python實現多程序間通訊的方式有很多種,例如佇列,管道等。但是這些方式只適用於多個程序都是源於同一個父程序的情況。如果多個程序不是源於同一個父程序,只能用共享記憶體,訊號量等方式,但是這些方式對於複雜的資料結構,例如Queue,dict,list等,使用起來比較麻煩,不夠靈活。Manager是一種較為高階

使用netty+zookeeper+protobuf實現一個RPC過程

上次實現了一個基於java序列化和阻塞IO模型的RPC過程,效率很低,這次換用NIO來實現。程式碼有點多,儘量寫清楚一點。 這是maven的版本依賴,先放在前面,接下來就可以複製了。。。 <dependency> <groupId&

python通過面向物件程式設計方法 實現鉛球執行軌跡的計算

本文我們通過面向物件程式設計的方法,實現鉛球執行軌跡計算的程式。 這個問題中有一個鉛球物件,它有4個屬性:xpos,ypos,xvel,yvel構建投射體的類Projectile用來生成鉛球物件,不斷的更新物件變數,以此來描述這個問題。 用python3編寫Projecti