1. 程式人生 > >Mina 編解碼器(解決粘包,斷包問題)

Mina 編解碼器(解決粘包,斷包問題)

 

什麼導致了斷包、粘包:

        mina是基於TCP/IP、UDP/IP協議棧的通訊框架。Mina 可以幫助我們快速開發高效能、高擴充套件性的網路通訊應用,Mina 提供了事件驅動、非同步(Mina 的非同步IO 預設使用的是JAVA NIO 作為底層支援)操作的程式設計模型。

        斷包、粘包的問題,是Mina基於TCP協議棧通訊的問題。TCP是面向流的,而面向流傳輸的資料是無保護邊界的,無保護邊界代表著如果傳送端連續傳輸資料接收端有可能在一次接收動作中,會接收兩個或者更多的資料包。

什麼是訊息保護邊界和無訊息保護邊界:

訊息保護邊界:就是傳輸協議把資料當做獨立的一條資料在網上進行傳輸,而且接收端也只能接收獨立的訊息,也就是因為存在著訊息保護邊界,接收端一次只能接收發送端傳來的一個數據包,這一點有一點像UDP協議。

無訊息保護邊界:面向流傳輸資料的是無訊息保護邊界的,也就是在傳送端連續傳送資料的情況下接受端可能會在一次中接收兩個或多個數據包。

斷包、粘包的體現例項:

① 先接收到資料包A,然後接收到資料包B;

② 先接收到資料包A的部分資料,然後接收到資料包A的剩餘資料和資料包的全部資料。

③ 先接受到資料包A的全部資料和資料包B的部分資料,然後接收到資料包B的剩餘全部資料。

④ 一次性接收完資料包A和資料包B的全部資料。

 

① 正常的情況

② 斷包+粘包

③ 粘包+斷包

④ 粘包

資料包(訊息)的格式:

 

包頭 + 訊息長度(int)+訊息內容(json字串、普通字串)+ 包尾

 

Mina處理 斷包、粘包問題

       在Mina框架中有個——CumulativeProtocolDecoder (累積性的協議解碼器),專門用來處理粘包和斷包問題。doDecode()的返回值有重要作用。

        @ doDecode() 方法 ——》 返回 true

,CumulativeProtocolDecoder 的 decode() 方法會首先判斷你是否在 doDecode() 方法中從內部的 IoBuffer 緩衝區中讀取了資料,如果沒有則會丟擲非法狀態的異常, 也就是因為 你的 doDecode()返回 true 的時候就表示你已經消費了本次的資料,(一個完整的訊息已經被讀取完畢)。

進一步說:也就是必須你已經消費過 內部 IoBuffer 緩衝區的資料(哪怕資料只有一個位元組的大小)。如果 內部 驗證通過,確實已經消費了資料,那麼CumulativeProtocolDecoder(累積性的協議解碼器)會檢查緩衝區是否還有資料沒有被讀取,如果有那麼 就繼續呼叫 doDecode(),如果沒有就停止對 doDecode() 方法的呼叫,直到有新的資料被緩衝。

       @ doDecode() 方法 ——》 返回 false , CumulativeProtocolDecoder 會停止對 doDecode() 方法的呼叫,但此時如果本次資料還有未讀取完的,就將含有資料的 IoBuffer 緩衝區儲存到 IoSession 中,以便下一次資料到來時可以從 IoSession 中提取合併。如果發現本次資料全部讀取完畢,則清空 IoBuffer 緩衝區 (開始進行接收下一個包)。

        簡單來說:當你認為讀取的資料已經夠解碼了,那麼就返回 true,否則就返回false。 CumulativeProtocolDecoder 就是幫你完成資料的累積,但是這個過程是很繁瑣的。

        也就是說:當返回 true 時,CumulativeProtocolDecoder 會呼叫 deDecoder() 把剩餘的資料發下來(剩餘資料就是在 remaining()中的資料),返回false 就是不處理剩餘的資料(剩餘資料不交給 doDecoder()處理),當有新的資料包傳過來的時候再把剩餘的資料和新的資料拼接在一起,然後呼叫 decoder。

需要的jar包

Mina 例項 Java程式碼

service類:

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8")))) ; // 自定義解編碼器

編碼器類:

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolEncoderAdapter;

import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import java.nio.charset.Charset;



public class ByteArrayEncoder extends ProtocolEncoderAdapter {

private final Charset charset;

public ByteArrayEncoder(Charset charset) {

this.charset = charset;

}

/**

* 直接將資料發出去,資料格式,包頭+訊息長度(int)+訊息內容(json字串)+包尾 包頭包尾是十六進 制字串00 aa bb cc,轉化成位元組陣列0, * -86, -69, -52四個位元組

*

* @param session

* @param message

* @param out

* @throws Exception

*/

@Override

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {

// 仿專案,解決斷包,粘包問題

String value = (message == null ? "" : message.toString()); // 訊息值

byte[] content = value.getBytes(charset);// 訊息內容,位元組陣列

IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);

// 緩衝區容量大小38位元組加上字元長度

buf.put(new byte[] { 0, -86, -69, -52 });

// 輸入包開頭固定值十六進位制00 aa bb cc,轉化成位元組陣列

buf.putUnsignedInt(content.length);

// int為4位元組,一個位元組等於2個16進位制字元,所以有八位 00 00 00 0c,內容長度。 buf.put(content);// 訊息內容

buf.put(new byte[] { 0, -86, -69, -52 });// 包尾

buf.flip();

out.write(buf); // 寫入

}

}

解碼器類 :解決Mina斷包,丟包問題 (重點)

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.CumulativeProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import java.nio.charset.Charset;

/**

* 自定義解碼器,確保能讀到完整的包

*/

public class ByteArrayDecoder extends CumulativeProtocolDecoder {

private final Charset charset;

public ByteArrayDecoder(Charset charset) {

this.charset = charset;

}

@Override

protected boolean doDecode (IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)throws Exception {

// 丟包,斷包處理

if (ioBuffer.remaining() > 4)// 有包頭,包頭足夠

{

ioBuffer.mark();

// 標記當前position的快照標記mark,以便後繼的reset操作能恢復position位置,開始是0

byte[] l = new byte[4];

ioBuffer.get(l);

// 讀取包頭,佔4個位元組

if (ioBuffer.remaining() < 4)

// 內容長度的4個位元組不夠,斷包

{

ioBuffer.reset();

return false;

} else {

// 內容長度的4個位元組陣列足夠

byte[] bytesLegth = new byte[4];

// 內容長度

ioBuffer.get(bytesLegth);// 讀取內容長度,int型別,佔四個位元組

int len = MinaUtil.byteArrayToInt(bytesLegth);

// 內容長度有多少

if (ioBuffer.remaining() < len) // 內容不夠,斷包

{

ioBuffer.reset();

return false;

} else {

// 訊息內容足夠

byte[] bytes = new byte[len];

ioBuffer.get(bytes, 0, len);

protocolDecoderOutput.write(new String(bytes, charset));

// 讀取內容,並且傳送

if (ioBuffer.remaining() < 4) { // 包尾不夠

ioBuffer.reset();

return false;

} else {

// 包尾足夠

byte[] tails = new byte[4];

ioBuffer.get(tails);

// 讀取包尾

if (ioBuffer.remaining() > 0){

// 最後如果粘了包,會再次呼叫doDeocde()方法,把剩餘資料給doDeocde()方法處理

return true;

}}}}}

return false; // 斷包,或者執行完,

} }

編解碼工廠

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolEncoder;

import java.nio.charset.Charset;

/** * 自定義解編碼器工廠 * */

public class ByteArrayCodecFactory implements ProtocolCodecFactory {

private ByteArrayDecoder decoder;

private ByteArrayEncoder encoder;

public ByteArrayCodecFactory() {

this(Charset.defaultCharset());

}

public ByteArrayCodecFactory(Charset charSet) {

encoder = new ByteArrayEncoder(charSet);

decoder = new ByteArrayDecoder(charSet);

}

@Override

public ProtocolDecoder getDecoder(IoSession session) throws Exception {

return decoder;

}

@Override

public ProtocolEncoder getEncoder(IoSession session) throws Exception {

return encoder;

} }

工具類方法 (二進位制轉整型)

public static int byte2Int(byte[] l) {        
        
        return (l[0]&0xff)<<24            
                | (l[1]&0xff)<<16            
                | (l[2]&0xff)<<8            
                | (l[3]&0xff);
    }

 強調: 這個 編解碼器   封裝的是  String 型別陣列,在業務邏輯處理層   收發信心的時候   要用 String 格式定義!