1. 程式人生 > >Node.js websocket/ws 詳解

Node.js websocket/ws 詳解

前言

眾所周知,HTTP協議是一種無狀態、無連線、單向的應用層協議,只能由客戶端發起請求,服務端響應請求。

這就顯示了一個明顯的弊端:服務端無法主動向客戶端發起訊息,一旦客戶端需要知道服務端的頻繁狀態變化,就要由客戶端盲目地多次請求以獲得最新地狀態,這就是長輪詢

而長輪詢有顯著地缺點:效率低、非常耗費資源,就在這個時候WebSocket出現了。

WebSocket是一個長連線,客戶端可以給服務端傳送訊息,服務端也可以給客戶端傳送訊息,這便是全雙工通訊

而node並沒有提供Websocket的API,我們需要對Node.js提供的HTTPServer做額外的開發,好在npm上已經有許多的實現,其中使用最為廣泛的就是本文主角——ws模組

WebSocket 串講

WebSocket連線也是由一個標準的HTTP請求發起,格式如下:

GET ws://localhost:3000/ws/chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Origin: http://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13

支援Websocket的伺服器在收到請求後會返回一個響應,格式如下:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: server-random-string

響應程式碼101表示本次連線的HTTP協議即將被更改,更改後的協議就是Upgrade: websocket指定的WebSocket協議,之後的資料傳輸將不再通過http協議,而是使用全雙工通訊的Websocket協議。

初識ws模組

先建立一個服務端程式

const WebSocket = require('ws');//引入模組

const wss = new WebSocket.Server({ port: 8080 });//建立一個WebSocketServer的例項,監聽埠8080
wss.on('connection', function connection(ws) { ws.on('message', function incoming(message) { console.log('received: %s', message); ws.send('Hi Client'); });//當收到訊息時,在控制檯打印出來,並回復一條資訊 });

再建立一個客戶端程式

const WebSocket = require('ws');

const ws = new WebSocket('ws://localhost:8080');

ws.on('open', function open() {
  ws.send('Hi Server');
});//在連線建立完成後傳送一條資訊

ws.on('message', function incoming(data) {
  console.log(data);
});//當收到訊息時,在控制檯打印出來

在node環境中先執行服務端程式,再執行客戶端程式,我們就可以在控制檯分別看到兩個端的列印資訊了

至此,通過ws模組建立的一個最簡單的Websocket連線就完成了!

WebSocketServer Class

繼承自EventEmitter,存在於客戶端的一個WebsocketServer

constructor (options, callback)

options有以下屬性、方法:

名稱 預設值 描述
maxPayload 100 *1024 *1024 每條message的最大載荷,單位:位元組
perMessageDeflate false 見詳解
handleProtocols(protocol, req) null 見詳解
clientTracking true 會在內部建立一個set,存下所有的連線,即.clients屬性,原始碼:if (options.clientTracking) this.clients = new Set();
verifyClient null verifyClient(info, (verified, code, message, headers))
noServer false 是否啟用無Server模式
backlog null use default (511 as implemented in net.js)
server null 在一個已有的HTTP/S Server的基礎上建立
host null 伺服器host
path null 只接收這個path的Websocket訪問,不指定則接收所有
port null 要監聽的埠

perMessageDeflate {Boolean|Object} 詳解

Websocket 協議的message傳輸有直接傳輸和先壓縮再傳輸兩種形式,而壓縮演算法是開放的。客戶端和服務端會協商是否啟用壓縮

客戶端如果設定了啟用壓縮,則在發起WebSocket通訊時會新增Sec-WebSocket-Extensions: permessage-deflate首部

 GET /examples/websocket
 HTTP/1.1\r\n
    Host: xxx.xxx.xxx.xxx:xx\r\n
    Connection: Upgrade\r\n
    Pragma: no-cache\r\n
    Cache-Control: no-cache\r\n
    Upgrade: websocket\r\n
    Origin: http://xxx.xxx.xxx.xxx:xx\r\n
    Sec-WebSocket-Version: 13\r\n
    User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n
    Accept-Encoding: gzip, deflate, sdch\r\n
    Accept-Language: zh-CN,zh;q=0.8,en;q=0.6\r\n
    Sec-WebSocket-Key: N+GWswsViw18TfSpryLcVw==\r\n
    Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n
    \r\n

服務端如果也設定了啟用壓縮,則在響應中會有Sec-WebSocket-Extensions首部,這樣就完成了協商,之後的通訊將啟用壓縮

  HTTP/1.1 101 \r\n
    Server: Apache-Coyote/1.1\r\n
    Upgrade: websocket\r\n
    Connection: upgrade\r\n
    Sec-WebSocket-Accept: xwLDQrb5kzxpZDdeTcUd+7diXXU=\r\n
    Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15\r\n
    Date: \r\n

ws模組中perMessageDeflate這個屬性預設為false,即不開啟壓縮,true則為開啟壓縮,也可以傳入一個Object自定義一些配置,此處略,官方例子如下:

const WebSocket = require('ws');

const wss = new WebSocket.Server({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: { // See zlib defaults.
      chunkSize: 1024,
      memLevel: 7,
      level: 3,
    },
    zlibInflateOptions: {
      chunkSize: 10 -1024
    },
    // Other options settable:
    clientNoContextTakeover: true, // Defaults to negotiated value.
    serverNoContextTakeover: true, // Defaults to negotiated value.
    clientMaxWindowBits: 10,       // Defaults to negotiated value.
    serverMaxWindowBits: 10,       // Defaults to negotiated value.
    // Below options specified as default values.
    concurrencyLimit: 10,          // Limits zlib concurrency for perf.
    threshold: 1024,               // Size (in bytes) below which messages
                                   // should not be compressed.
  }
});

handleProtocols(protocol, req)

對sec-websocket-protocol的協議進行一個選擇,預設會選擇第一個協議,這些協議是使用者自定義的字串,比如可以用chat代表即時聊天,就可以寫成sec-websocket-protocol:chat,…
部分原始碼如下:

var protocol = req.headers['sec-websocket-protocol'];
...
if (this.options.handleProtocols) {
        protocol = this.options.handleProtocols(protocol, req);
      } else {
        protocol = protocol[0];
      }

故該方法最後應返回一個字串,為選中的協議,之後可以通過ws.protocal獲取,針對自己定義的不同的協議作不同的處理

verifyClient()

如果沒有設定這個方法,則預設會接收所有連線Websocket
的請求
有兩種形參形式: verifyClient(info), verifyClient(info, callback)
info有如下屬性:

  • origin 字串 即websocket的origin
  • secure Boolean 僅當req.connection.authorized 或req.connection.encrypted 不為null時為true.
  • req 即這個客戶端請求連線的GET請求.

對於單形參的形式,return true代表通過,return false代表不通過,將自動返回一個401響應

對於雙形參的形式,呼叫callback(true, null, null, null)代表通過,呼叫calback(false, 401, “unauthorized”,null)代表不通過

一般來說,雙形參的形式僅當需要自定義錯誤響應的資訊時使用

原始碼如下:

if (this.options.verifyClient) {
      const info = {
        origin: req.headers[`${version === 8 ? 'sec-websocket-origin' : 'origin'}`],
        secure: !!(req.connection.authorized || req.connection.encrypted),
        req
      };

      if (this.options.verifyClient.length === 2) {
        this.options.verifyClient(info, (verified, code, message, headers) => {
          if (!verified) {
            return abortHandshake(socket, code || 401, message, headers);
          }

          this.completeUpgrade(extensions, req, socket, head, cb);
        });
        return;
      }

      if (!this.options.verifyClient(info)) return abortHandshake(socket, 401);
    }

port server noserver host path詳解

注意:port、server、noserver = true是互斥的,這三者必須設定且只能設定一個

當設定了port時,server和noserver將不起效果,會建立一個httpserver去監聽這個埠

當沒有設定port且設定了server時,將使用這個以有的server

部分原始碼如下:

if (options.port != null) {
      this._server = http.createServer((req, res) => {
        const body = http.STATUS_CODES[426];

        res.writeHead(426, {
          'Content-Length': body.length,
          'Content-Type': 'text/plain'
        });
        res.end(body);
      });
      this._server.listen(options.port, options.host, options.backlog, callback);
    } else if (options.server) {
      this._server = options.server;
    }
if (this._server) {
    this._removeListeners = addListeners(this._server, {
        listening: this.emit.bind(this, 'listening'),
        error: this.emit.bind(this, 'error'),
        upgrade: (req, socket, head) => {
          this.handleUpgrade(req, socket, head, (ws) => {
            this.emit('connection', ws, req);
          });
        }
      });
    }

由上面的原始碼也可以看出,host屬性僅在指定port新建http server時有效

path屬性未指定時將接收所有的url,指定後將僅接收指定的url,部分原始碼如下

  /**
   *See if a given request should be handled by this server instance.
   *
   *@param {http.IncomingMessage} req Request object to inspect
   *@return {Boolean} `true` if the request is valid, else `false`
   *@public
   */
  shouldHandle (req) {
    if (this.options.path && url.parse(req.url).pathname !== this.options.path) {
      return false;
    }

    return true;
  }

事件監聽

一般語法:.on(“event”, funcion)

connection事件

var wss = new ws.Server({port: 3000});
wss.on("connection", (socket, request)=>{});

當握手完成後會發出,socket引數為WebSocket型別,request為http.IncomingMessage型別

一般在這個事件中通過socket.on註冊socket的事件

error事件

var wss = new ws.Server({port: 3000});
wss.on("connection", (error)=>{});

當依賴的httpServer出現錯誤時發出,error為Error型別

headers事件

var wss = new ws.Server({port: 3000});
wss.on("connection", (headers, request)=>{});

握手事件中,伺服器即將響應請求時會發出這個事件,可以在方法中對headers進行修改
headers為陣列型別,request為http.IncomingMessage型別

listening事件

var wss = new ws.Server({port: 3000});
wss.on("connection", ()=>{});

當繫結依賴的httoServer時發出

其它屬性、方法

server.clients

如上文constructor處所提,僅當clientTracking為true時這個屬性有例項,為set型別,儲存著所有websocket連線

server.address()

Returns an object with port, family, and address properties specifying the bound address, the address family name, and port of the server as reported by the operating system if listening on an IP socket. If the server is listening on a pipe or UNIX domain socket, the name is returned as a string.

server.close([callback])

關閉這個WebsocketServer所有的websocket連線,並且如果所依賴的httpServer是它建立的的話(即指定了port),這個httpServer
會被關閉,原始碼如下:

  /**
   *Close the server.
   *
   *@param {Function} cb Callback
   *@public
   */
  close (cb) {
    //
    // Terminate all associated clients.
    //
    if (this.clients) {
      for (const client of this.clients) client.terminate();
    }

    const server = this._server;

    if (server) {
      this._removeListeners();
      this._removeListeners = this._server = null;

      //
      // Close the http server if it was internally created.
      //
      if (this.options.port != null) return server.close(cb);
    }

    if (cb) cb();
  }

WebSocket Class

繼承自EventEmitter
這個類的例項有兩種,一種是客戶端的例項,一種是服務端的例項

constructor

new WebSocket(address[, protocols][, options])

  • address {String|url.Url|url.URL} *必填-,要連線的url
  • protocols {String|Array} *可選-,要使用的協議,即Sec-WebSocket-Protocol首部
  • options {Object} 可選
    • handshakeTimeout {Number} Timeout in milliseconds for the handshake request.
    • perMessageDeflate {Boolean|Object} 與WebSocketServer的類似,但預設值為true
    • protocolVersion {Number} 即Sec-WebSocket-Version首部.
    • origin {String} 即Origin或Sec-WebSocket-Origin首部,具體是哪一個由protocolVersion決定.
    • maxPayload {Number} message最大負載,單位:位元組

一般只有客戶端才通過這個方法建立例項,服務端的例項是由WebsocketServer自動建立的

監聽事件

一般語法: websocket.on(“event”, Function())
無論是客戶端還是服務端的例項都需要監聽事件

message 事件

websocket.on("message", (data)=>{});

當收到訊息時發出,data 型別為 String|Buffer|ArrayBuffer|Buffer[]

close 事件

websocket.on("close", (code, reason)=>{});

當連線斷開時發出

error 事件

websocket.on("error", (error)=>{});

open 事件

websocket.on("open", ()=>{});

連線建立成功時發出

ping 事件

websocket.on("ping", (data)=>{});

收到ping訊息時發出,data為Buffer型別

pong 事件

websocket.on("pong", (data)=>{});

收到pong訊息時發出,data為Buffer型別
注:ping,pong事件通常用來檢測連線是否仍聯通,由客戶端(服務端)發出一個ping事件,服務端(客戶端)收到後回覆一個pong事件,客戶端(服務端)收到後就知道連線仍然聯通

unexpected-response 事件

websocket.on("unexpected-response", (request, response)=>{});

request {http.ClientRequest} response {http.IncomingMessage}
當服務端返回的報文不是期待的結果,例如401,則會發出這個事件,如果這個事件沒有被監聽,則會丟擲一個錯誤

upgrade事件

websocket.on("upgrade", (response)=>{});

response {http.IncomingMessage}
握手過程中,當收到服務端回覆時發出該事件,你可以在response中檢視cookie等header

其它屬性、方法

websocket.readyState

客戶端、服務端例項都可呼叫
-{Number}
返回當前連線的狀態碼

Constant Value Description
CONNECTING 0 The connection is not yet open.
OPEN 1 The connection is open and ready to communicate.
CLOSING 2 The connection is in the process of closing.
CLOSED 3 The connection is closed.

websocket.protocol

{String} 型別
客戶端、服務端例項都可呼叫
返回伺服器選擇使用的協議

websocket.send(data[, options][, callback])

客戶端、服務端例項都可呼叫

  • data 要傳送的資訊
  • options {Object}
    • compress {Boolean} 當permessage-deflate啟用時預設為true
    • binary {Boolean} 是否採用二進位制傳輸,預設為false,自動檢測
    • mask {Boolean} 當時客戶端的例項時為true
    • fin {Boolean} 當前data是否是message的最後一個fragment,預設為true
  • callback 資訊傳送完成後的回撥函式

websocket.url

{String}型別
僅客戶端例項可呼叫
返回伺服器的url,如果是伺服器的Client則沒有這個屬性

websocket.bufferedAmount

{Number} 型別
客戶端、服務端例項都可呼叫
返回已經被send()加入傳送佇列,但仍未傳送到網路的message的數量

websocket.ping([data[, mask]][, callback])

客戶端、服務端例項都可呼叫

  • data {Any} 攜帶的資訊
  • mask {Boolean} Specifies whether data should be masked or not. Defaults to true when websocket is not a server client.
  • callback {Function} ping訊息傳送後的回撥事件
    傳送一個ping訊息

websocket.pong([data[, mask]][, callback])

客戶端、服務端例項都可呼叫

  • data {Any} 攜帶的資訊
  • mask {Boolean} Specifies whether data should be masked or not. Defaults to true when websocket is not a server client.
  • callback {Function} pong訊息傳送後的回撥事件
    傳送一個pong訊息

websocket.close([code[, reason]])

開始發出斷開連線的請求

原文:Initiate a closing handshake.

websocket.terminate()

客戶端、服務端例項都可呼叫
強制關閉連線

websocket.binaryType

-{String}
A string indicating the type of binary data being transmitted by the connection. This should be one of “nodebuffer”, “arraybuffer” or “fragments”. Defaults to “nodebuffer”. Type “fragments” will emit the array of fragments as received from the sender, without copyfull concatenation, which is useful for the performance of binary protocols transferring large messages with multiple fragments.

websocket.addEventListener(type, listener)

  • type {String} A string representing the event type to listen for.
  • listener {Function} The listener to add.

Register an event listener emulating the EventTarget interface.

用法解析

建立一個簡單服務端程式

1.直接指定port建立

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

2.從一個以有的httpServer的例項建立

const fs = require('fs');
const https = require('https');
const WebSocket = require('ws');

const server = new https.createServer({
  cert: fs.readFileSync('/path/to/cert.pem'),
  key: fs.readFileSync('/path/to/key.pem')
});
const wss = new WebSocket.Server({ server });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

server.listen(8080);

3.與koa框架的結合使用

// koa app的listen()方法返回http.Server:
let server = app.listen(3000);

// 建立WebSocketServer:
let wss = new WebSocketServer({
    server: server
});

4.多個WebsocketServer依賴相同的httpServer

const http = require('http');
const WebSocket = require('ws');

const server = http.createServer();
const wss1 = new WebSocket.Server({ noServer: true });
const wss2 = new WebSocket.Server({ noServer: true });

wss1.on('connection', function connection(ws) {
  // ...
});

wss2.on('connection', function connection(ws) {
  // ...
});

server.on('upgrade', function upgrade(request, socket, head) {
  const pathname = url.parse(request.url).pathname;

  if (pathname === '/foo') {
    wss1.handleUpgrade(request, socket, head, function done(ws) {
      wss1.emit('connection', ws, request);
    });
  } else if (pathname === '/bar') {
    wss2.handleUpgrade(request, socket, head, function done(ws) {
      wss2.emit('connection', ws, request);
    });
  } else {
    socket.destroy();
  }
});

server.listen(8080);

小結:

  • 服務端的建立有port,server,noserver等多種方式
  • 服務端監聽的connection事件中,方法中的引數ws就是前文提到的**服務端的**Websocket例項

建立一個簡單的客戶端程式

ws模組不僅包含服務端模組,還包含客戶端模組
1.直接建立一個Websocket連線

const WebSocket = require('ws');

const ws = new WebSocket('ws://www.host.com/path');

ws.on('open', function open() {
  ws.send('something');
});

ws.on('message', function incoming(data) {
  console.log(data);
});

2.傳送二進位制資料

const WebSocket = require('ws');

const ws = new WebSocket('ws://www.host.com/path');

ws.on('open', function open() {
  const array = new Float32Array(5);

  for (var i = 0; i < array.length; ++i) {
    array[i] = i / 2;
  }

  ws.send(array);
});

小結:

  • 通過new Websocket()形式建立的就是前文提到的客戶端的Websocket例項
  • ws.send() 方法可以傳送字串、二進位制資料等多種形式

如何獲得客戶端的ip

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

//直接連線的情況
wss.on('connection', function connection(ws, req) {
  const ip = req.connection.remoteAddress;
});

//存在代理的情況
wss.on('connection', function connection(ws, req) {
  const ip = req.headers['x-forwarded-for'].split(/\s*,\s*/)[0];
});

通過ping, pong事件檢測連線是否仍可用

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

function noop() {}

function heartbeat() {
  this.isAlive = true;
}

wss.on('connection', function connection(ws) {
  ws.isAlive = true;
  ws.on('pong', heartbeat);
});

const interval = setInterval(function ping() {
  wss.clients.forEach(function each(ws) {
    if (ws.isAlive === false) return ws.terminate();

    ws.isAlive = false;
    ws.ping(noop);
  });
}, 30000);

注:Websocket客戶端在收到ping事件會自動返回,不需要監聽

資料參考