1. 程式人生 > >pomelo原始碼分析(6)--connector協議處理message

pomelo原始碼分析(6)--connector協議處理message

pomelo框架核心提供了sioconnector,udpconnector,hybirdconnector,mqttconnector。sioconnector基於socket.io,使用json通訊,pc端通訊。hybirdconnector基於tcp和websocket,使用二進位制通訊,主要用於手機端通訊。mqttconnector使用mqtt協議通訊,mqtt是二進位制協議,是物聯網協議,這個就是用於嵌入式裝置通訊。而udpconnector,這個看名字也知道是基於udp的,它也是使用二進位制協議進行通訊。這個主要用於網路環境不好,資料包小的場景。

connector按照約定是要提供encode/decode的。sioconnector的encode/decode最簡單。因為它是處理json的。在connector提供encode/decode之外,還可以單獨設自定義的encode/decode。先看sioconnector,因為它比較簡單。

從decode看起,decode就是json解析。

/**
 * Decode client message package.
 *
 * Package format:
 *   message id: 4bytes big-endian integer
 *   route length: 1byte
 *   route: route length bytes
 *   body: the rest bytes
 *
 * @param  {String} data socket.io package from client
 * @return {Object}      message object
 */
Connector.decode = Connector.prototype.decode = function(msg) { var index = 0; //package ID var id = parseIntField(msg, index, PKG_ID_BYTES); index += PKG_ID_BYTES; //route體長 var routeLen = parseIntField(msg, index, PKG_ROUTE_LENGTH_BYTES); //route字串 var route = msg.substr(PKG_HEAD_BYTES, routeLen); var
body = msg.substr(PKG_HEAD_BYTES + routeLen); return { id: id, route: route, body: JSON.parse(body) //json包體 }; }; //取長度 var parseIntField = function(str, offset, len) { var res = 0; for(var i=0; i<len; i++) { //big-endian,網路位元組序,高位在前 if(i > 0) { res <<= 8; } res |= str.charCodeAt(offset + i) & 0xff; } return res; };

從decode可以看出來,訊息格式是有一個package id,一個route,然後就是訊息體。訊息體是json。而encode稍微複雜一點。

Connector.encode = Connector.prototype.encode = function(reqId, route, msg) {
  if(reqId) { //有reqId,這個序號是客戶端編的
    return composeResponse(reqId, route, msg);
  } else { //沒有就是廣播
    return composePush(route, msg);
  }
};

//注意這個地方,route被忽略了
var composeResponse = function(msgId, route, msgBody) {
  return {
    id: msgId, //reqId,請求包序號
    body: msgBody // 回覆訊息體
  };
};

var composePush = function(route, msgBody) {
  return JSON.stringify({route: route, body: msgBody});
};

sioconnector.js的協議處理是非常簡單的。欄位也很少,但是body裡面可能就千變萬化了,這個是業務相關的。相信寫過稍大一點專案的都很清楚,有的模組甚至有幾百個命令,幾百個命令就會產生幾百種body。

下面再分析一下hybirdconnector.js。到了這裡就要正式講一下pomelo的訊息格式了,pomelo的訊息分為兩層,package和message。 以下引用原文:“pomelo的二進位制協議包含兩層編碼:package和message。message層主要實現route壓縮和protobuf壓縮,message層的編碼結果將傳遞給package層。package層主要實現pomelo應用基於二進位制協議的握手過程,心跳和資料傳輸編碼,package層的編碼結果可以通過tcp,websocket等協議以二進位制資料的形式進行傳輸。message層編碼可選,也可替換成其他二進位制編碼格式,都不影響package層編碼和傳送。”

package格式

package分為header和body兩部分。header描述package包的型別和包的長度,body則是需要傳輸的資料內容。具體格式如下:

type - package型別,1byte,取值如下。
0x01: 客戶端到伺服器的握手請求以及伺服器到客戶端的握手響應
0x02: 客戶端到伺服器的握手ack
0x03: 心跳包
0x04: 資料包
0x05: 伺服器主動斷開連線通知
length - body內容長度,3byte的大端整數,因此最大的包長度為2^24bytebody - 二進位制的傳輸內容。

message協議的主要作用是封裝訊息頭,包括route和訊息型別兩部分,不同的訊息型別有著不同的訊息頭,在訊息頭裡面可能要打入message id(即requestId)和route資訊。由於可能會有route壓縮,而且對於服務端push的訊息,message id為空,對於客戶端請求的響應,route為空,因此message的頭格式比較複雜。
訊息頭分為三部分,flag,message id,route。
pomelo訊息頭是可變的,會根據具體的訊息型別和內容而改變。其中:
flag位是必須的,佔用一個byte,它決定了後面的訊息型別和內容的格式;
message id和route則是可選的。其中message id採用[varints 128變長編碼](https://developers.google.com/protocol-buffers/docs/encoding#varints)方式,根據值的大小,長度在0~5byte之間。route則根據訊息型別以及內容的大小,長度在0~255byte之間。

從這段文字的描述可以看出來,我們剛才對sioconnector.js中encode和decode的分析都是基於message的,package部分的沒有涉及到。

本篇暫時不講package部分,聚集點在於message部分。因為一發散的話,就沒有重點了。

hybirdconnector.js對於encode和decode的處理是,寫了一個coder.js作為抽象。

var coder = require('./common/coder');

Connector.decode = Connector.prototype.decode = coder.decode;

Connector.encode = Connector.prototype.encode = coder.encode;

可以看到encode和decode獨立出去了,做了一個單獨的抽象,這樣提高了複用性和擴充套件性。

coder.js

//這是pomelo的另一個開源元件
var Message = require('pomelo-protocol').Message;
var Constants = require('../../util/constants');
//pomelo-logger也是另一個元件,不在核心模組裡
var logger = require('pomelo-logger').getLogger('pomelo', __filename);

//encode函式
var encode = function(reqId, route, msg) {
  if(!!reqId) {
    return composeResponse(this, reqId, route, msg);
  } else {
    return composePush(this, route, msg);
  }
};

//decode函式
var decode = function(msg) {
  msg = Message.decode(msg.body);
  var route = msg.route;

  // decode use dictionary
  if(!!msg.compressRoute) {
    if(!!this.connector.useDict) {
      var abbrs = this.dictionary.getAbbrs();
      if(!abbrs[route]) {
        logger.error('dictionary error! no abbrs for route : %s', route);
        return null;
      }
      route = msg.route = abbrs[route];
    } else {
      logger.error('fail to uncompress route code for msg: %j, server not enable dictionary.', msg);
      return null;
    }
  }

  // decode use protobuf,protobuf協議解碼
  if(!!this.protobuf && !!this.protobuf.getProtos().client[route]) {
    msg.body = this.protobuf.decode(route, msg.body);
  } else if(!!this.decodeIO_protobuf && !!this.decodeIO_protobuf.check(Constants.RESERVED.CLIENT, route)) {
    msg.body = this.decodeIO_protobuf.decode(route, msg.body);
  } else {
    try {
      msg.body = JSON.parse(msg.body.toString('utf8'));
    } catch (ex) {
      msg.body = {};
    }
  }

  return msg;
};

var composeResponse = function(server, msgId, route, msgBody) {
  if(!msgId || !route || !msgBody) {
    return null;
  }
  msgBody = encodeBody(server, route, msgBody);
  return Message.encode(msgId, Message.TYPE_RESPONSE, 0, null, msgBody);
};

var composePush = function(server, route, msgBody) {
  if(!route || !msgBody){
    return null;
  }
  msgBody = encodeBody(server, route, msgBody);
  // encode use dictionary
  var compressRoute = 0;
  if(!!server.dictionary) {
    var dict = server.dictionary.getDict();
    if(!!server.connector.useDict && !!dict[route]) {
      route = dict[route];
      compressRoute = 1;
    }
  }
  return Message.encode(0, Message.TYPE_PUSH, compressRoute, route, msgBody);
};

var encodeBody = function(server, route, msgBody) {
    // encode use protobuf
  if(!!server.protobuf && !!server.protobuf.getProtos().server[route]) {
    msgBody = server.protobuf.encode(route, msgBody);
  } else if(!!server.decodeIO_protobuf && !!server.decodeIO_protobuf.check(Constants.RESERVED.SERVER, route)) {
     msgBody = server.decodeIO_protobuf.encode(route, msgBody);
  } else { //相容json
    msgBody = new Buffer(JSON.stringify(msgBody), 'utf8');
  }
  return msgBody;
};

module.exports = {
  encode: encode,
  decode: decode
};

對於coder.js中的encode和decode,裡面呼叫的函式名和sioconnector.js中都是一致的。所不同的是對於body的處理,json的話直接用JSON相關的函式就可以了。從coder.js檔案來看,所謂的二進位制實際上是用的protobuf,不支援其它的二進位制協議。程式碼是比較清晰的,就不再對程式碼做太多解釋了。

最後補充說明,協議這種東西,最好不要自定義二進位制協議,更不要自定義類似query string那種文字協議。自定義二進位制協議一是除錯非常的麻煩,二是要做協議轉換的時候,開發速度慢,出錯率高,工作量大,自定義二進位制協議少有能直接DSL生成轉換程式碼的。最好的方案目前看到的也是用lua去對映,然後寫一段通用程式碼去轉換。而類query string的文字協議就更痛苦了,長的就是像這樣子a=b&c=d。這種協議第一,要做編碼轉換,特殊字元轉換。二,這種表示是一維的,key-value形式,也就是一個map轉成了陣列。它的擴充套件性非常地差,巢狀表達能力基本為0,因為巢狀表達就需要新增分隔符,多層巢狀以後,調協議會成為開發之間的導火索。同時作為文字協議,它的體積很大,無法壓縮,也不能直觀地格式化。非要用文字協議,直接用json就好了。

相關推薦

pomelo原始碼分析(6)--connector協議處理message

pomelo框架核心提供了sioconnector,udpconnector,hybirdconnector,mqttconnector。sioconnector基於socket.io,使用json通訊,pc端通訊。hybirdconnector基於tcp和

Dubbo原始碼分析:RPC協議實現-服務端併發控制與Semaphore訊號量

概述 Dubbo支援在服務端通過在service或者method,通過executes引數設定每個方法,允許併發呼叫的最大執行緒數,即在任何時刻,只允許executes個執行緒同時呼叫該方法,超過的則拋異常返回,從而對提供者服務進行併發控制,保護資源。 用法 服務級別 限

Dubbo原始碼分析:RPC協議實現-客戶端併發呼叫控制

概述 Dubbo支援在服務或者方法粒度,通過actives引數,控制客戶端對提供者服務的所有方法或者某個方法進行併發訪問控制,即在同一時刻,客戶端只允許active個請求併發呼叫服務的某個方法,超過的請求需要等待,如果在timeout時間內還是無法執行呼叫,則異常退出。 用法

Dubbo原始碼分析:RPC協議實現-RPC過程與核心介面設計

RPC的基本過程 提供者Provider:提供服務的介面定義和介面的具體實現,然後通過URL的方式告訴消費者,某個URL對應某個service實現,一般是將服務的資訊註冊到一個註冊中心,如zookeeper或者Redis等; 消費者Consumer:獲取提供者的介面定義

Libevent原始碼分析-----訊號event的處理

訊號event的工作原理:         前面講解了Libevent如何監聽一個IO事件,現在來講一下Libevent如何監聽訊號。Libevent對於訊號的處理是採用統一事件源的方式。簡單地說,就是把訊號也轉換成IO事件,整合到Libevent中。         統

Libevent原始碼分析-----超時event的處理

如何成為超時event:                 Libevent允許建立一個超時event,使用evtimer_new巨集。 //event.h檔案 #define evtimer_new(b, cb, arg) event_new((b), -1, 0,

谷歌瀏覽器的原始碼分析(6)

類AutocompleteEdit繼承了類CWindowImpl、類CRichEditCommands、類Menu::Delegate。其中類CWindowImpl實現了Windows視窗,它是WTL裡的視窗模板類,主要用來建立視窗介面類,並且使用類CRichEditCtrl作為基類,類CRichEditCt

[Abp 原始碼分析]十、異常處理

0.簡介 Abp 框架本身針對內部丟擲異常進行了統一攔截,並且針對不同的異常也會採取不同的處理策略。在 Abp 當中主要提供了以下幾種異常型別: 異常型別 描述 AbpException Abp 框架定義的基本異常型別,Abp 所有內部定義的異常型別都繼承自本類。 AbpInitializa

docker原始碼分析之容器日誌處理與log-driver實現

子程序:由一個程序(父程序)建立的程序,整合父程序大部分屬性,同時可以被父程序守護和管理。 (2) 你需要知道關於程序產生日誌的形式: 程序產生

presto原始碼分析(hive的分割槽處理)

修改原始碼時遇到一個問題,就是對分割槽的處理,當遇到join查詢時,如上篇文章presto join連線時的謂詞處理所述,對於某些情況下,如果謂詞帶or,會吧分割槽欄位當做普通欄位處理,不會下推到表掃描處。但是hive是如何處理這種情況的呢? 1 h

WebRTC原始碼分析三:視訊處理流程

 文字介紹視訊的處理流程。圖1中顯示了兩路視訊會話視訊訊號流過程。 圖1 視訊流程示意圖 以一路視訊會話為例,主要分為以下幾個執行緒: 1)視訊源產生執行緒:Camera生產視訊畫面,封裝成視訊幀,以一定幀率投遞到下一個模組。; 2)採集執行緒:由Capturer負責採集視訊幀,並對視訊幀進行一定處理,如

Libevent原始碼分析-----日誌和錯誤處理

日誌處理:         在Libevent的原始碼中,經常會見到形如event_warn、event_msgx、event_err之類的函式。這通常出現在程式碼中一些值是不合理時。這些函式就是Libevent的日誌函式。它能把這些不合理的情況打印出來,告知使用者

Android原始碼分析——Looper,Messagequeue,Message,handler初始化及handler機制簡介

private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }直接上原始碼可見是個private的構造

twemproxy原始碼分析之四:處理流程

很讚的註釋: * nc_connection.[ch] * Connection (struct conn) * + + + * |

Libevent原始碼分析-timer和signal處理

timer處理 evtimer_set(&ev, time_cb, NULL);//設定定時器事件 其中evtimer_set是個巨集定義 #define evtimer_set(ev, cb, arg) event_set

lucene原始碼分析---6

lucene原始碼分析—建立IndexReader 本章開始分析lucene的查詢過程,下面先看一段lucene6版本下常用的查詢程式碼, String indexPath; IndexReader reader = Dire

雲客Drupal8原始碼分析之響應附屬處理attachments_processor

在閱讀本主題前,你需要先閱讀本系列的渲染陣列、渲染器、渲染佔位符等主題 附屬物attachments就是渲染陣列的#attached部分,這裡稱為“附屬物”而不叫做“附件”,以便和圖片、檔案等概念相區別,附屬物有如下8個型別(以在#attached中的鍵名列出,如果添加了其

React原始碼分析6 -- 元件通訊,refs,key,ReactDOM

1 元件間通訊 父元件向子元件通訊 React規定了明確的單向資料流,利用props將資料從父元件傳遞給子元件。故我們可以利用props,讓父元件給子元件通訊。故父元件向子元件通訊還是很容易實現的。引申一點,父元件怎麼向孫子元件通訊呢?可以利用props

小夥伴們的ceph原始碼分析三——monitor訊息處理流程

筆者在讀程式碼初期非常想理清楚的就是ceph這麼個系統在服務端與客戶端是怎麼響應與發起請求的。 本人主要負責monitor部分,而且追了一會cephx認證的程式碼,所以拿這塊舉例,後續osd部分主要是對同事分享的學習。 本篇會講到src/mon/monitor.cc中 c

springboot原始碼分析6-springboot之PropertySource類初探

在springboot原始碼分析5-springboot之命令列引數以及原理一文中,我們看到了例項化Source類的時候,會去先例項化其父類SimpleCommandLinePropertySource。SimpleCommandLinePropertySource類的建構函