1. 程式人生 > >開源專案SMSS發開指南(三)——protobuf協議設計

開源專案SMSS發開指南(三)——protobuf協議設計

本文的第一部分將介紹protobuf使用基礎以及如何利用protobuf設計通訊協議。第二部分會給出smss專案的協議設計規範和原始碼講解。

一.Protobuf使用基礎

什麼是protobuf

protobuf是谷歌研發的一種資料序列化和儲存技術。主要可以用來解決網路通訊中異構系統的通訊和資料持久化,與同類技術相比(JSON或XML),官方宣稱的資料量長度減少3~10倍,運算速度20~100倍。由於與平臺無關,因此非常適合使用在多系統的互動中。

目前常見的使用版本是2和3,個人推薦如果你打算在專案中引入protobuf技術,不妨直接選擇版本3。以下的所有介紹也都基於protobuf3作為標準。

從個人的使用感受來看,protobuf的優點還是相當明顯的。不過,也有一些問題需要注意。

如何使用protobuf以及常見問題

protobuf依賴於一個可閱讀的描述檔案,字尾以.proto結束。編寫proto描述檔案有固定的格式,詳細說明參照官方文件。smss專案的doc目錄下也有提供,描述檔案可閱讀,因此不存在太大難度。只是需要注意,目前如果你打算使用protobuf3版本需要在檔案開頭註明:syntax="proto3"。後期不清楚谷歌是否會更改,因此建議使用者應該關注官方說明。

一般來說,protobuf依賴谷歌提供的編譯工具將描述檔案(.proto)翻譯為對應語言的原始碼。你也可以在專案中直接引入protobuf的依賴包利用動態編譯(反射)直接使用。建議使用前一種方式,無論是難度上還是效率上都更直接。注意:編譯工具和protobuf的開發依賴有版本對應,需要保持統一。

protobuf的C++版本會生成.h和.cc兩個檔案。對於C/C++程式設計師來說,使用的方式和Struct幾乎一致。

protobuf的Java版本會生成一系列路徑(如果有設定的話)和對應的.java檔案。Java開發人員可以直接複製到專案下使用,通過構造器模式來建立物件。

官方示例:

Person john = Person.newBuilder()
    .setId(1234)
    .setName("John Doe")
    .setEmail("[email protected]")
    .build();
output = new FileOutputStream(args[0]);
john.writeTo(output);

protobuf的JS版本會生成.js檔案,這裡不再贅述。

不同語言的構造命令可以參考smss專案的指令碼檔案。

使用Protobuf特別需要關注你的使用場景,通常來說需要注意以下兩點:

(1)protobuf專注於對小型字元資料的序列化/反序列化操作。如果你需要傳輸大型檔案或二進位制資料是不適合使用的。

(2)如果你打算在TCP/IP層來應用protobuf協議,依然需要設計包解析機制。TCP/IP傳輸會發生粘包或長包,這些問題protobuf無法幫你解決。如果你傳輸的包中包含多條資料,交給protobuf解析的時候,它只能反序列化出最後一條。這就要求在資料包的設計中必須包含一些必要欄位。

目前smss專案的用法是在一個包的頭部增加了4個位元組的包頭標識(AB47)和4個位元組的小端整型數表示protobuf序列化後的包長度。根據筆者的實踐發現,相同的包結構如果設定的資料不同可能最後序列化的資料長度有差異。因此在設定包長度的時候一定要根據實際的序列化為標準。

什麼場景下適合使用protobuf

相信在瞭解了protobuf的基本使用後,大多數有經驗的開發人員會有自己的判斷。我在這裡僅拋磚引玉提供一些個人的思考:

(1)內部系統開發:目前protobuf並未被大規模的實踐。如果你的專案需要對接外部系統,請對方提供或支援protobuf協議難度較大。因此,內部系統開發進行互動推薦使用。

(2)TCP/IP層資料通訊:目前的Java微服務應用大多使用http應用層協議,好處是實現過程相對簡單。而且由於各種開源框架對JSON-POJO的對映功能非常完備,如果從開發效率上考慮,顯然protobuf還不具備優勢。如果在業務中新增一些資料中臺業務,需要開發更加高效的通訊過程,利用protobuf是更加合理的方案。

(3)異構系統:不少物聯網專案會涉及多種語言多種裝置間的通訊。例如C++直接使用struct的序列化後傳輸給Java來處理,就必要麻煩。這類需求是使用protobuf的最佳使用場景。

二.SMSS專案協議設計規範

目前smss專案利用protobuf協議作為通訊的主要手段,正如前文介紹的那樣。為了提高通訊效率,專案各端內部使用TCP/IP層通訊。因此在包頭設計了包頭標識和包長度標識(8個位元組)。另外,與http協議不同的是,TCP/IP由於是長連線且是面向連線設計,因此需要設計應用層的規範。smss將一個完整的應用資料包分為資料頭和資料體,結構如下:

message MsgHeader
{
    int32 msg_size = 1; // 訊息體的長度
    int64 msg_id = 2; // 訊息ID,作為伺服器應答時候的對應
    MsgType msg_type = 3; // 訊息型別
    // 伺服器為0
    uint32 from = 4; // 訊息傳送方
    uint32 to = 5; // 訊息接收方
    string token = 6; // 令牌
}
message LoginReq
{
    string username = 1;
    string password = 2;
    bool is_need_key = 3; // 是否需要請求私鑰
}

message LoginResp
{
    enum LoginRespType
    {
        OK = 0;        //登陸成功
        ERROR=1;    //使用者名稱密碼錯誤
        NOUSER=2;    //使用者不存在
    }
    LoginRespType resp = 1;
    string token = 2; // 通訊令牌
    uint32 id = 3; // 使用者id
    string alias = 4; // 使用者別名
    string prv_key = 5; // 登入成功攜帶使用者通訊私鑰
}

MsgHeader表示資料頭,除了提供傳送發接收方等常用資訊外,主要依賴訊息型別(MsgType)和訊息體長度(msg_size)作為資料包的反序列化依據。由於在通訊的過程中需要加密,訊息體是用過protobuf序列化完成後再使用演算法進行加密傳輸。因此服務端只需要解析頭資料即可完成對訊息的轉發和處理。為解析TCP包增加的8個位元組,作為包長度的標識特指資料頭的長度,資料體的長度通過反序列化資料頭來確定。

資料的序列化與反序列化原始碼主要包含在服務端(socket_manager.cpp)和客戶端(smss_socket_event.js)檔案中。

服務端原始碼:

void SocketManager::ReadCb()
{
    char flag[5] = {0};
    int head_size = 0;
    // 判斷報文頭
    int len = bufferevent_read(buff_ev_, flag, 4);
    if (len <= 0 || strcmp(flag, PKT_FLAG) != 0)
    {
        return;
    }
    // 獲得訊息頭大小
    len = bufferevent_read(buff_ev_, &head_size, 4);
    if (len <= 0)
    {
        return;
    }
    char *head = new char[head_size];
    len = bufferevent_read(buff_ev_, head, head_size);
    // 解析訊息頭物件
    MsgHeader msg_header;
    msg_header.ParseFromArray(head, head_size);
    delete[] head;
    char *msg_buff = new char[msg_header.msg_size()];
    // FIX:if msg_body too large
    len = bufferevent_read(buff_ev_, msg_buff, msg_header.msg_size());
    switch (msg_header.msg_type())
    {
    case MsgType::CONNECT_REQ:
        RecvConnectReqest(&msg_header, msg_buff, len);
        break;
    case MsgType::CLIENT_LOGIN_REQ: // 登入請求
        RecvLoginRequest(&msg_header, msg_buff, len);
        break;
    case MsgType::HEART_BEAT: // 心跳
        RecvHeartBeat(msg_buff, len);
        break;
    case MsgType::USER_INFO_REQ: // 使用者資訊請求
        if (msg_header.token() != this->token_)
        {
            LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "使用者通訊令牌(token)驗證錯誤!");
            return;
        }
        RecvUserInfoRequest(&msg_header, msg_buff, len);
        break;
    case MsgType::MSG_SEND_REQ: // 訊息傳送請求
    {
        if (msg_header.token() != this->token_)
        {
            LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "使用者通訊令牌(token)驗證錯誤!");
            return;
        }
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), "MsgType::MSG_SEND_REQ");
        work_thread_->SendToNetBus(msg_header.SerializeAsString().c_str(), msg_header.ByteSizeLong(), msg_buff, msg_header.msg_size());
    }
    break;
    case MsgType::USER_STATUS_REQ: // 使用者狀態請求
        if (msg_header.token() != this->token_)
        {
            LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "使用者通訊令牌(token)驗證錯誤!");
            return;
        }
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), "MsgType::USER_STATUS_REQ");
        RecvUserStatusRequest(&msg_header, msg_buff, len);
        break;
    case MsgType::SERVICE_REGIST: // 服務註冊
        RecvServiceRegist(&msg_header, msg_buff, len);
        break;
    case MsgType::FILE_DOWNLOAD_NOTICE:
        work_thread_->SendToNetBus(msg_header.SerializeAsString().c_str(), msg_header.ByteSizeLong(), msg_buff, msg_header.msg_size());
        break;
    default:
    {
        stringstream ss;
        ss << "缺少對應的訊息型別處理函式:" << msg_header.msg_type();
        LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), ss.str());
    }
    }
    delete[] msg_buff;
}

ReadCb()是資料接收的直接處理方法,首先讀取4個位元組判斷包頭標識:int len = bufferevent_read(buff_ev_, flag, 4),判斷成功代表當前資料包是完整的。再讀取4個位元組的整型數來判斷後續一個數據頭的長度:bufferevent_read(buff_ev_, &head_size, 4)。接下來收取資料頭的完整資料並通過protobuf反序列化:msg_header.ParseFromArray(head, head_size)。最後根據資料頭的msg_type欄位判斷應該如何處理資料體。

客戶端原始碼:

onData(data) {
  // 處理粘包,迴圈讀取
  let readSize = 0;
  while (readSize < data.length) {
    let flag = data.toString("utf8", readSize, readSize + 4);
    if (flag !== "AB47") {
      readSize += 4;
      continue;
    }
    readSize += 4;
    let headerSize = data.readInt32LE(readSize);
    readSize += 4;
    // 訊息頭反向序列化
    let msgHeader = MsgHeader.deserializeBinary(
      data.subarray(readSize, headerSize + readSize)
    );
    readSize += headerSize;
    // 訊息型別
    let msgType = msgHeader.getMsgType();
    // 訊息大小
    let msgSize = msgHeader.getMsgSize();
    switch (msgType) {
      case MsgType.USER_INFO_RESP:
        this.onUserInfoResp(
          UserInfoResp.deserializeBinary(
            data.subarray(readSize, msgSize + readSize)
          )
        );
        break;
      case MsgType.USER_STATUS_NOTICE:
        this.onUserStatusNotice(
          UserStatus.deserializeBinary(
            data.subarray(readSize, msgSize + readSize)
          )
        );
        break;
      case MsgType.MSG_SEND_REQ:
        this.onSmsSendReq(data.subarray(readSize, msgSize + readSize));
        break;
      case MsgType.FILE_DOWNLOAD_NOTICE:
        new DownloadEvent(
          this.$store.state.User.userID,
          this.$store.state.User.userToken,
          msgHeader,
          FileDownloadNotice.deserializeBinary(
            data.subarray(readSize, msgSize + readSize)
          )
        );
        break;
      default:
    }
    readSize += msgSize;
  }
}
/**
 * 連線事件處理
 * 
 * @param {*} socket 
 * @param {*} userID 
 */
function ConnectEvent(socket, userID) {
    return new Promise((resolve, reject) => {
        fs.open("./data/.shadow/server.pem", "r", (err, fd) => {
            let connectReq = new ConnectReq();
            connectReq.setTimestamp(new Date().getTime());
            if (err) {
                connectReq.setIsNeedKey(true);
            } else {
                connectReq.setIsNeedKey(false);
            }
            let connectReqBuffer = connectReq.serializeBinary();
            let msgHeader = new MsgHeader();
            msgHeader.setMsgSize(connectReqBuffer.length);
            msgHeader.setMsgId(0);
            msgHeader.setMsgType(MsgType.CONNECT_REQ);
            msgHeader.setFrom(userID);
            msgHeader.setTo(0); // 傳送給伺服器
            const headerBuffer = msgHeader.serializeBinary();
            let packageHeader = Buffer.alloc(8);
            packageHeader.write("AB47");
            packageHeader.writeInt32LE(headerBuffer.length, 4);
            const packageBuffer = Buffer.concat([
                packageHeader,
                headerBuffer,
                connectReqBuffer
            ]);
            socket.write(packageBuffer, () => {
                socket.once("data", data => {
                    let flag = data.toString("utf8", 0, 4);
                    if (flag !== "AB47") {
                        return;
                    }
                    let headerSize = data.readInt32LE(4);
                    // 訊息頭反向序列化
                    let msgHeader = MsgHeader.deserializeBinary(
                        data.subarray(8, headerSize + 8)
                    );
                    // 訊息型別
                    let msgType = msgHeader.getMsgType();
                    // 訊息大小
                    let msgSize = msgHeader.getMsgSize();
                    if (msgType !== MsgType.CONNECT_RESP) {
                        reject("ConnectEvent RES MsgType Error!");
                    } else {
                        let resp = ConnectResp.deserializeBinary(
                            data.subarray(8 + headerSize, msgSize + 8 + headerSize)
                        );
                        if (resp.getPubKey() !== "") {
                            fs.writeFile(
                                "./data/.shadow/server.pem",
                                resp.getPubKey(),
                                err => {
                                    if (err) {
                                        reject(err);
                                    };
                                    // 連線完成後進行登入
                                    resolve(resp);
                                }
                            );
                        } else {
                            resolve(resp);
                        }
                    }
                })
            });
        });
    });
}

處理的過程和服務端的思路一致,也是從包頭到資料頭最後是資料體的解析。由於JavaScript在對網路呼叫和檔案讀取的時候大量需要使用回撥函式,因此smss專案在客戶端利用Promise進行了封裝。學習的時候建議先熟悉一下Promise的使用方式。

 

完整原始碼已經發布在碼雲上。

相關檔案:《開源專案SMSS開發指