成幀與解析

閱讀 《java TCP/IP Socket 程式設計》第三章筆記

成幀技術(frame)是解決如何在接收端定位訊息的首尾位置的問題。在進行資料收發時,必須指定訊息接收者如何確定何時訊息已經接收完整。

在TCP協議中,訊息是按照位元組來傳輸的,而且TCP協議中是沒有訊息邊界的概念的。因為當client和server雙方建立TCP連線後,雙方可以自由傳送位元組資料。

為了能夠在訊息傳輸中確定訊息的邊界,需要引入額外的資訊來標示訊息邊界。常用的辦法有兩種:

基於定界符與基於顯式訊息長度

基於定界符

我們在訊息的末尾新增一個唯一標記作為訊息結束符,這個唯一的標記一般是一個位元組或者一組位元組序列,並且在訊息中不能出現這個標記。

基於定界符的方法一般用於以文字方式編碼的訊息中,定義一個特殊的字元作為分隔符來表示訊息結束。但是這個分隔符也有可能作為普通字元可能會出現在訊息中,導致訊息解析出現錯誤。為了讓訊息中不出現分隔符,需要引入填充(stuff)技術,在傳送端對訊息進行掃描,如果碰到分隔符,就將這個分隔符用一個替換符和其他符號(比如將原始字元二進位制中的第三位取反得到一個新的位元組作為)替換,同樣的,如果掃描中遇到替換符,將替換符也用一個替換付和其他符號替換。在訊息的接收端,同樣也對接收到的訊息進行掃描,當碰到替換符時,說明該字元不是訊息中的,要將後面一個字元進行還原得到相應的原始字元,這個才是訊息中真正的字元。當遇到分隔符時,說明該訊息已經結束

顯式訊息長度

在訊息前面新增一個固定大小的欄位(一個位元組或者兩個位元組長度),用於表示訊息包含的位元組個數(也就是訊息的長度)。在訊息傳送時,計算訊息的長度(位元組數),作為訊息的字首。如果使用一個位元組儲存長度,則訊息長度最大為\(2^8=256\)個位元組,如果是兩個位元組儲存長度,則訊息長度最大為\(2^{16}=65536\)個位元組

訊息成幀與解析的實現

在java中,當client和server之間建立tcp連線後,就可以通過輸入輸出流(I/O stream)來進行訊息傳輸。傳送訊息時,將待發送的訊息寫入OutputStream流中,然後傳送到接收端InputStream流;接收端則從InputStream流中讀取出訊息。如何實現將訊息按幀傳送與接收,就需要要利用我們上面提到的方法。

我們先定義一個Framer介面,來宣告兩個方法,訊息成幀frameMsg()和訊息抽取nextMsg()

package chapter_3.frame;

import java.io.IOException;
import java.io.OutputStream; /**
* @author fulv
* Framer介面聲明瞭兩個方法,用於訊息成幀和解析將待發送訊息封裝成幀並輸出到指定流
*/
public interface Framer { /**
* 將輸入的訊息msg封裝成幀,然後輸出到out流
*
* @param msg 輸入的訊息
* @param out 訊息輸出流
*/
void frameMsg(byte[] msg, OutputStream out); /**
* 從指定流中讀取下一個訊息幀
*
* @return byte[]
*/
byte[] nextMsg() throws IOException;
}

然後分別使用基於分隔符和基於顯式訊息長度兩種方法來實現Framer介面

基於分隔符:

在這裡,我們使用字元'\n'作為訊息分隔符,它對應的位元組為0x0A;使用的替換符為0x7D。替換的策略是:當掃描到待發送的訊息byte陣列中有0x0A時,將其替換為(0x7D,0x2A),如果遇到0x7D,將其替換為(0x7D,0x5D)。這裡面第二個字元通過將待替換字元從左向右數第三位取反獲得。

在 接收端,從輸入流中讀取位元組流資料,遇到0x7D時,說明後面一個位元組對應的是特殊位元組,需要轉換得到原始位元組。如果遇到0x0A說明到達訊息幀末尾,完成了一個訊息幀的讀取。

package chapter_3.frame;

import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets; /**
* 採用界定符的方式來實現訊息的封裝成幀以及訊息幀的解析
*
* @author fulv
*/
public class DelimitFramer implements Framer { /**
* 資料輸入源,從中解析出訊息幀
*/
private InputStream in; /**
* 訊息幀的定界符
*/
private static final byte DELIMITER = '\n';
/**
* 替換字元,用於將出現在訊息內部的'\n'進行替換,避免出現解析錯誤
*/
private static final byte REPLACE_CHAR = (byte) 0x7d; private static final byte MASK = (byte) 0x20; public DelimitFramer(InputStream in) {
this.in = in;
} @Override
public void frameMsg(byte[] msg, OutputStream out) {
//向判斷傳入的訊息中是否包含界定符與替換符,如果存在,執行相關位元組填充操作
//將對應的界定符和替換符換成兩個字元,其中第一個為替換符,第二個為將要替換的字元的從左到右的第二位取反形成的字元
int count = 0;
for (byte b : msg) {
if (DELIMITER == b || REPLACE_CHAR == b) {
count++;
}
}
byte[] extendMsg = new byte[msg.length + count];
for (int i = 0, j = 0; i < msg.length; i++) {
if (DELIMITER == msg[i] || REPLACE_CHAR == msg[i]) {
extendMsg[j++] = REPLACE_CHAR;
extendMsg[j++] = byteStuff(msg[i]);
} else {
extendMsg[j++] = msg[i];
}
}
try {
out.write(extendMsg);
out.write(DELIMITER);
out.flush();
} catch (IOException e) {
e.printStackTrace();
System.out.println("訊息寫入流失敗");
} } /**
* 從訊息輸入流in中,取出下一個訊息幀(以分隔符劃分一個訊息幀)
*
* @return
*/
@Override
public byte[] nextMsg() throws IOException {
ByteArrayOutputStream msgBuffer = new ByteArrayOutputStream();
int nextByte; while ((nextByte = in.read()) != DELIMITER) {
//已經讀完了輸入流,這裡分兩種情況
if (-1 == nextByte) {
//輸入流中的位元組已經全部讀完
if (msgBuffer.size() == 0) {
return null;
} else {
//讀取了部分位元組,但卻沒有遇到分隔符,說明輸入的訊息幀是不完整或者錯誤的,返回異常
throw new EOFException("讀取到了不正確的訊息幀");
}
} //當前字元為替換字元,需要讀取下一個字元並轉換(將第三位取反)得到正確的字元
if (REPLACE_CHAR == nextByte) {
nextByte = in.read() & 0xFF;
nextByte = byteStuff((byte) nextByte);
}
msgBuffer.write(nextByte);
}
return msgBuffer.toByteArray();
} /**
* 位元組填充函式,將傳入位元組的從左到右數的第二位取反
*
* @param originByte
* @return
*/
private static byte byteStuff(byte originByte) {
return (byte) ((originByte | MASK) & ~(originByte & MASK));
}
}

基於顯式訊息長度方法:

使用兩個位元組無符號整型來表示待發送訊息的長度,最長為65536。將訊息長度按照位元組大端序寫入待發送的訊息前,表示訊息長度。

接收端,首先從輸入流中讀出訊息長度,然後堵塞的從輸入流中讀取資料,直到讀取出的資料量達到訊息長度,整個訊息幀才讀取結束。

package chapter_3.frame;

import java.io.*;

/**
* 基於顯式長度的方法來將實現訊息成幀
*
* @author fulv
*/
public class LengthFramer implements Framer { private static final int MESSAGEMAXLENGTH = 65536; private DataInputStream in; public LengthFramer(DataInputStream in) {
this.in = in;
} @Override
public void frameMsg(byte[] msg, OutputStream out) throws IOException {
if (msg.length > MESSAGEMAXLENGTH) {
throw new IOException("傳入的訊息超出最大長度");
}
int msgLength = msg.length;
//將訊息長度按照位元組大端序寫入輸出流中
out.write((msgLength >> 8) & 0xFF);
out.write(msgLength & 0xFF);
//將訊息寫入輸出流
out.write(msg);
out.flush();
} @Override
public byte[] nextMsg() throws IOException {
int length;
byte[] msg = null;
try {
//從輸入流中讀取兩個位元組,作為大端序的整型值解釋,表示訊息長度
length = in.readUnsignedShort();
} catch (EOFException e) {
return null;
}
//存放從輸入流中讀取出的訊息位元組陣列
msg = new byte[length];
//readFully多次呼叫read方法直到讀取到指定長度的陣列訊息或者讀取到-1返回
in.readFully(msg);
return msg;
}
}

測試

對兩種訊息分幀方式進行測試,開啟兩個執行緒分別表示client與server,測試訊息的傳送與接收。

package chapter_3.frame;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets; public class TestFramer { private static final String[] messages = {"Hello World!", "Hello China, 你好 中國", "世界人民大團結萬歲",
"在訊息中傳送分隔符\n和替換符}的情況"}; public static void main(String[] args) throws InterruptedException {
Thread clientThread = new Thread(() -> {
Socket socket = null;
try {
socket = new Socket(InetAddress.getLocalHost(), 8888);
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
//Framer framer = new DelimitFramer(in);
DataInputStream dataInputStream = new DataInputStream(in);
Framer framer = new LengthFramer(dataInputStream);
for (String msg : messages) {
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
framer.frameMsg(msgBytes, out);
System.out.println(Thread.currentThread().getName() + " 傳送訊息: " + msg);
}
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread serverThread = new Thread(() -> {
Socket socket = null;
try (ServerSocket serverSocket = new ServerSocket(8888)) {
while (true) {
socket = serverSocket.accept();
System.out.println("獲取到來自" + socket.getRemoteSocketAddress() + "的tcp連線");
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
//Framer framer = new DelimitFramer(in);
DataInputStream dataInputStream = new DataInputStream(in);
Framer framer = new LengthFramer(dataInputStream);
byte[] recvMsgBytes = null;
do {
recvMsgBytes = framer.nextMsg();
//System.out.println(Arrays.toString(recvMsgBytes));
if (recvMsgBytes != null) {
System.out.println(Thread.currentThread().getName() + " 接收到的訊息: " + new String(recvMsgBytes, StandardCharsets.UTF_8));
}
} while (recvMsgBytes != null);
}
} catch (IOException e) {
e.printStackTrace();
}
});
serverThread.setName("server");
clientThread.setName("client");
serverThread.start();
Thread.sleep(3000);
clientThread.start();
}
}

輸出結果:

獲取到來自/10.0.75.1:2462的tcp連線
server 接收到的訊息: Hello World!
client 傳送訊息: Hello World!
client 傳送訊息: Hello China, 你好 中國
server 接收到的訊息: Hello China, 你好 中國
client 傳送訊息: 世界人民大團結萬歲
server 接收到的訊息: 世界人民大團結萬歲
client 傳送訊息: 在訊息中傳送分隔符
和替換符}的情況
server 接收到的訊息: 在訊息中傳送分隔符
和替換符}的情況