【Socket客戶端封裝類】 及 【解決粘包和分包問題的Message封裝類】
阿新 • • 發佈:2019-02-15
Socket通訊中解決粘包和分包問題的Message封裝類
傳送時:
先發報頭長度
再編碼報頭內容然後傳送
最後發真實內容
接收時:
先收報頭長度
根據取出的長度收取報頭內容,然後解碼,反序列化
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MyChatServer.Server
{
/// <summary>
/// 訊息傳送封裝類
/// </summary>
public class Message
{
//包含你所有需要傳遞的資料所轉換的位元組
private byte[] buffer = null;
private int _type = -1;
//當前的位元組位置
private int _currentPos = 0;
//開始讀的位元組位置
private int _readPos = 0;
//指定型別
public Message(int __type)
{
this._type = __type;
buffer = new byte[30 ];
putInt(__type);
}
public Message()
{
buffer = new byte[1024 * 100];
}
//寫入位元組
private void bytesWirteBytes(byte[] bs)
{
for (int i = 0; i < bs.Length; i++)
{
if (_currentPos >= buffer.Length)
{
byte [] newBuffer = new byte[buffer.Length * 2];
Array.Copy(buffer, newBuffer, buffer.Length);
buffer = newBuffer;
}
buffer[_currentPos++] = bs[i];
}
}
//Int型資料轉化位元組並寫入
public void putInt(int value)
{
this.bytesWirteBytes(System.BitConverter.GetBytes(value));
}
//Long型資料轉化位元組並寫入
public void putLong(long value)
{
this.bytesWirteBytes(System.BitConverter.GetBytes(value));
}
//string型資料轉化位元組並寫入(string的長度和string)
public void putString(string value)
{
byte[] b = Encoding.UTF8.GetBytes(value);
this.putInt(b.Length);
this.bytesWirteBytes(b);
}
//Double型資料轉化位元組並寫入
public void putDouble(double value)
{
this.bytesWirteBytes(System.BitConverter.GetBytes(value));
}
//Message型資料轉化位元組並寫入
public void putMessage(Message msg)
{
this.bytesWirteBytes(msg.toBytes);
}
//byte[]型資料轉化位元組並寫入
public void putBytes(byte[] bytes)
{
this.bytesWirteBytes(bytes);
}
//byte[]型資料轉化位元組並寫入(指定開始位置長度)
public void putBytes(byte[] bytes, int index, int len = 0)
{
if (index < 0 || index > bytes.Length || len < 0 || len > bytes.Length) return;
int leng = (len == 0 ? bytes.Length : len);
for (int i = index; i < leng + index; i++)
{
this.buffer[_currentPos++] = bytes[i];
}
}
//設定開始讀位元組的位置
public void setSeek(int _pos)
{
if (_pos < 0 || _pos >= buffer.Length) return;
_readPos = _pos;
}
//獲取Int型資料
public int getInt()
{
int value = System.BitConverter.ToInt32(buffer, _readPos);
_readPos += sizeof(int);
return value;
}
//獲取Long型資料
public long getLong()
{
long value = System.BitConverter.ToInt64(buffer, _readPos);
_readPos += sizeof(long);
return value;
}
//獲取Double型資料
public double getDouble()
{
double value = System.BitConverter.ToDouble(buffer, _readPos);
_readPos += sizeof(double);
return value;
}
//獲取UTF string型資料
public string getUTFString()
{
int len = this.getInt();
string str = Encoding.UTF8.GetString(buffer, _readPos, len);
_readPos += len;
return str;
}
//獲取訊息型別
public int type
{
get
{
if (_type > 0) return _type;
_readPos = 0;
_type = this.getInt();
return _type;
}
}
//獲取當前訊息中的所有位元組(size長度)
public byte[] toBytes
{
get
{
byte[] arr = new byte[this.Size];
Array.Copy(buffer, arr, this.Size);
return arr;
}
}
//獲取位元組陣列的大小
public int Size
{
get { return this._currentPos; }
}
//buffer儲存當前需要傳送的訊息->int size->byte[]
public byte[] byteSize
{
get
{
byte[] _size = System.BitConverter.GetBytes(this.Size);
return _size;
}
}
//重置
public void reset()
{
Array.Clear(this.buffer, 0, this._currentPos);
this._currentPos = 0;
this._readPos = 0;
this._type = -1;
}
}
}
CSocket:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Windows.Forms;
using System.Collections.Concurrent;
/*
* 網路客戶端連線元件
*
*
*/
namespace NET
{
/*
*網路連線類
*/
class CSocket
{
//定義套接字物件
private Socket client = null;
//判斷當前是否正在連線,一個客戶端,只開一個連線
private bool isConnection = false;
public bool IsConnection { get { return isConnection; } }
//連線物件
private static CSocket _csocket = null;
//訊息處理列隊
private MessageQueue messageQueue = null;
Thread thread1;
private CSocket()
{
//連線TCP連線協議 --------------4IP網路 ------------流位元組傳送 ----TCP協議
client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
}
/*
* 開始連線到網路
* _ip:伺服器ip地址
* _port:伺服器開放連線埠
*/
public void connectServer(string _ip, int _port)
{
try
{
// _ip = "192.168.3.206";
// _port = 8888;
//如果當前正在連線中,則不需要再次連線,如在不同的場景中,防止同一客戶端,多次連線,造成資料不同步。
if (this.isConnection==true) return;
//字串IP解析
IPAddress ip = IPAddress.Parse(_ip);
//開始正真連線到網路指定的IP伺服器中
client.Connect(new IPEndPoint(ip, _port));//連線中....
//設定連線標誌,防止未斷線情況下多次連線
this.isConnection = true;
//啟動訊息接受子執行緒
this.start();
MessageBox.Show("連線成功");
}
catch (SocketException e)
{
//連接出錯時,關閉網路
this.isConnection = false;
Console.WriteLine("連線到伺服器出錯!");
MessageBox.Show(e.ToString());
this.Close();
}
}
//建立連線單列物件,可以不同場景中自由呼叫
public static CSocket shareCSocket()
{
if (_csocket == null) _csocket = new CSocket();
return _csocket;
}
//啟動執行緒
public void start()
{
if (thread1!=null) return;
//建立執行緒
thread1= new Thread(new ThreadStart(reciveMessage));
thread1.Start();
thread1.IsBackground = true;
}
//傳送訊息到伺服器
//msg:要傳送的訊息物件
//傳送協議:訊息長度(int) - 訊息頭(int) - 訊息主體(bytes)
public void sendMessage(Message1 msg)
{
if (this.isConnection)
{
//先發送訊息長度
this.client.Send(msg.byteSize,0,msg.byteSize.Length,0);
//再發送訊息主體
this.client.Send(msg.toBytes,0,msg.Size,0);
}
}
//訊息接收
public void reciveMessage()
{
try
{
//每次最多隻能接受100K的內容,如果是檔案或是視訊傳輸,請更改位元組大小
byte[] bytes = new byte[1024 * 100];
//每次從TCP協議中讀取出來的訊息標誌,>0,成功,0,-1:失敗或伺服器異常
int read = 0;
//每次讀取出來的訊息寫入到位元組陣列中的位置
int readPos = 0;
//儲存訊息頭是否讀完
int headLeng = 0;
//定義讀取超時
client.ReceiveTimeout = 1000 * 60 * 10;
//建立訊息執行緒
messageQueue = MessageQueue.shareMessageQueue();
while (this.isConnection)
{
//如果訊息頭還沒有讀完,則一直讀取訊息頭,或是下一條訊息頭
if (headLeng == 0)
{
//一個一個位元組讀,防止網路丟包和粘包問題
read = client.Receive(bytes, readPos, 1, 0);
//訊息一個一個讀,每次都往位元組後寫一個位元組內容
readPos++;
if (read <= 0) break;//如果遠端伺服器中斷
if (readPos < 4) continue;//如果協議頭還沒有讀完 - 4個位元組
//讀協議頭
headLeng = System.BitConverter.ToInt32(bytes, 0);
}
else
{
//如果得到一條完整的訊息
if (readPos - 4 >= headLeng)
{
//建立一條訊息
Message1 msg = new Message1();
//將一條完整的訊息存入到Message物件中,去掉前面4個位元組的長度資訊。
msg.putBytes(bytes, 4, readPos);
//將訊息加入到訊息佇列
messageQueue.putMessage(msg);
// MessageBox.Show("訊息");
//準備下一次讀
readPos = 0;
//此處將頭清0,便可讀取下一條訊息
headLeng = 0;
//將位元組清0,此方法速度高於new byte[],且節約記憶體
Array.Clear(bytes, 0, bytes.Length);
}
else
{
//如果協議頭還沒有讀完
headLeng = 0;
}
}
}
this.Close();
}
catch (Exception e){
MessageBox.Show("伺服器已斷開!");
}finally{
this.Close();
}
}
public void Close()
{
thread1.Abort();
isConnection = false;
// client.Close();
//Console.WriteLine("連線關閉");
// MethodDelegate.shareMethodDelegate().removeAllMethods();
client.Dispose();
client = null;
_csocket = null;
}
}
public delegate void callBack(Message1 msg);
/*
* 訊息委託處理類
*/
public class MethodDelegate
{
private static MethodDelegate md = null;
private Dictionary<int, callBack> methods = null;
private Dictionary<int, Form> forms = null;
private Form formRun;
private MethodDelegate()
{
methods = new Dictionary<int, callBack>();
forms = new Dictionary<int, Form>();
}
public static MethodDelegate shareMethodDelegate()
{
if (md == null) md = new MethodDelegate();
return md;
}
public void addMethod(int type, callBack callFun, Form form = null)
{
if (this.methods.ContainsKey(type))
{
this.methods.Remove(type);
}
this.methods.Add(type, callFun);
//視窗執行緒 - 子網路接收子執行緒不能更新UI介面,所以必須要有窗體反射執行。
if (this.forms.ContainsKey(type))
{
this.forms.Remove(type);
}
this.forms.Add(type, form);
}
public void removeMethod(int type)
{
if (this.methods.ContainsKey(type))
{
this.methods.Remove(type);
}
if (this.forms.ContainsKey(type))
{
this.forms.Remove(type);
}
}
public void Run(int type, Message1 msg)
{
// MessageBox.Show(type.ToString());
if (!this.methods.ContainsKey(type)) return;
callBack fun = this.methods[type];
Form f = this.forms[type];
if (f != null)
{
f.Invoke(fun, msg);
return;
}
fun(msg);
}
public bool isClose = false;
public void removeAllMethods()
{
this.methods.Clear();
formRun = null;
isClose = true;
MessageQueue.shareMessageQueue().th.Abort();
}
}
/*
* 訊息處理佇列
*/
class MessageQueue
{
//private Queue<Message> messageQueue = null;
private ConcurrentQueue<Message1> messageQueue = null;
private MethodDelegate methodRun = null;
private static bool isSet = true;
private static MessageQueue mqeue = null;
private static AutoResetEvent evt = new AutoResetEvent(false);
public static MessageQueue shareMessageQueue()
{
if (mqeue == null) mqeue = new MessageQueue();
return mqeue;
} public Thread th;
private MessageQueue() {
messageQueue = new ConcurrentQueue<Message1>();
methodRun = MethodDelegate.shareMethodDelegate();
//建立一條處理訊息的執行緒
th= new Thread(runMessage);
th.Start();
th.IsBackground = true;
}
public void putMessage(Message1 message)
{
messageQueue.Enqueue(message);
// MessageBox.Show("新訊息");
if (isSet)
{
isSet = false;
evt.Set();
}
}
private void runMessage()
{
bool f = false;
Message1 message = null;
while (true)
{
f = messageQueue.TryDequeue(out message);
if (f)
{
//執行方法
// MessageBox.Show("執行程式碼");
methodRun.Run(message.type,message);
//把訊息通知到U3D執行緒
}
if (messageQueue.Count == 0)
{
isSet = true;
//佇列沒有訊息了,就等待...,直以有訊息進來,繼續執行
evt.WaitOne();
}
}
}
}
/*
* U3D 訊息對映類
*/
class MessageCallBack
{
}
}