1. 程式人生 > >【Socket客戶端封裝類】 及 【解決粘包和分包問題的Message封裝類】

【Socket客戶端封裝類】 及 【解決粘包和分包問題的Message封裝類】

Socket通訊中解決粘包和分包問題的Message封裝類

傳送時:
先發報頭長度
再編碼報頭內容然後傳送
最後發真實內容
接收時:
先收報頭長度
根據取出的長度收取報頭內容,然後解碼,反序列化

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MyChatServer.Server
{
    /// <summary>
    /// 訊息傳送封裝類
    /// </summary>
    public class Message
    {
        //包含你所有需要傳遞的資料所轉換的位元組
private byte[] buffer = null; private int _type = -1; //當前的位元組位置 private int _currentPos = 0; //開始讀的位元組位置 private int _readPos = 0; //指定型別 public Message(int __type) { this._type = __type; buffer = new byte[30
]; putInt(__type); } public Message() { buffer = new byte[1024 * 100]; } //寫入位元組 private void bytesWirteBytes(byte[] bs) { for (int i = 0; i < bs.Length; i++) { if (_currentPos >= buffer.Length) { byte
[] newBuffer = new byte[buffer.Length * 2]; Array.Copy(buffer, newBuffer, buffer.Length); buffer = newBuffer; } buffer[_currentPos++] = bs[i]; } } //Int型資料轉化位元組並寫入 public void putInt(int value) { this.bytesWirteBytes(System.BitConverter.GetBytes(value)); } //Long型資料轉化位元組並寫入 public void putLong(long value) { this.bytesWirteBytes(System.BitConverter.GetBytes(value)); } //string型資料轉化位元組並寫入(string的長度和string) public void putString(string value) { byte[] b = Encoding.UTF8.GetBytes(value); this.putInt(b.Length); this.bytesWirteBytes(b); } //Double型資料轉化位元組並寫入 public void putDouble(double value) { this.bytesWirteBytes(System.BitConverter.GetBytes(value)); } //Message型資料轉化位元組並寫入 public void putMessage(Message msg) { this.bytesWirteBytes(msg.toBytes); } //byte[]型資料轉化位元組並寫入 public void putBytes(byte[] bytes) { this.bytesWirteBytes(bytes); } //byte[]型資料轉化位元組並寫入(指定開始位置長度) public void putBytes(byte[] bytes, int index, int len = 0) { if (index < 0 || index > bytes.Length || len < 0 || len > bytes.Length) return; int leng = (len == 0 ? bytes.Length : len); for (int i = index; i < leng + index; i++) { this.buffer[_currentPos++] = bytes[i]; } } //設定開始讀位元組的位置 public void setSeek(int _pos) { if (_pos < 0 || _pos >= buffer.Length) return; _readPos = _pos; } //獲取Int型資料 public int getInt() { int value = System.BitConverter.ToInt32(buffer, _readPos); _readPos += sizeof(int); return value; } //獲取Long型資料 public long getLong() { long value = System.BitConverter.ToInt64(buffer, _readPos); _readPos += sizeof(long); return value; } //獲取Double型資料 public double getDouble() { double value = System.BitConverter.ToDouble(buffer, _readPos); _readPos += sizeof(double); return value; } //獲取UTF string型資料 public string getUTFString() { int len = this.getInt(); string str = Encoding.UTF8.GetString(buffer, _readPos, len); _readPos += len; return str; } //獲取訊息型別 public int type { get { if (_type > 0) return _type; _readPos = 0; _type = this.getInt(); return _type; } } //獲取當前訊息中的所有位元組(size長度) public byte[] toBytes { get { byte[] arr = new byte[this.Size]; Array.Copy(buffer, arr, this.Size); return arr; } } //獲取位元組陣列的大小 public int Size { get { return this._currentPos; } } //buffer儲存當前需要傳送的訊息->int size->byte[] public byte[] byteSize { get { byte[] _size = System.BitConverter.GetBytes(this.Size); return _size; } } //重置 public void reset() { Array.Clear(this.buffer, 0, this._currentPos); this._currentPos = 0; this._readPos = 0; this._type = -1; } } }

CSocket:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Windows.Forms;
using System.Collections.Concurrent;


/*
 * 網路客戶端連線元件
 * 
 * 
 */
namespace NET

{

    /*
     *網路連線類
     */
    class CSocket
    {
        //定義套接字物件
        private Socket client = null;
        //判斷當前是否正在連線,一個客戶端,只開一個連線
        private bool isConnection = false;
        public bool IsConnection { get { return isConnection; } }
        //連線物件
        private static CSocket _csocket = null;
        //訊息處理列隊
        private MessageQueue messageQueue = null;
        Thread thread1;
        private CSocket()
        {
            //連線TCP連線協議  --------------4IP網路 ------------流位元組傳送 ----TCP協議
            client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);   
        }
        /*
         * 開始連線到網路
         * _ip:伺服器ip地址
         * _port:伺服器開放連線埠
         */
        public void connectServer(string _ip, int _port)
        {
            try
            {
               // _ip = "192.168.3.206";
               // _port = 8888;
                //如果當前正在連線中,則不需要再次連線,如在不同的場景中,防止同一客戶端,多次連線,造成資料不同步。
                if (this.isConnection==true) return;                
                //字串IP解析
                IPAddress ip = IPAddress.Parse(_ip);
                //開始正真連線到網路指定的IP伺服器中
                client.Connect(new IPEndPoint(ip, _port));//連線中....
                //設定連線標誌,防止未斷線情況下多次連線
                this.isConnection = true;
                //啟動訊息接受子執行緒
                this.start();
                MessageBox.Show("連線成功");
            }
            catch (SocketException e)
            {
                //連接出錯時,關閉網路
                this.isConnection = false;
                Console.WriteLine("連線到伺服器出錯!");
                MessageBox.Show(e.ToString());
                this.Close();
            }
        }
        //建立連線單列物件,可以不同場景中自由呼叫
        public static CSocket shareCSocket()
        {
            if (_csocket == null) _csocket = new CSocket();
            return _csocket;
        }
        //啟動執行緒
        public void start()
        {
            if (thread1!=null) return;
            //建立執行緒
            thread1= new Thread(new ThreadStart(reciveMessage));
            thread1.Start();
            thread1.IsBackground = true;
        }
        //傳送訊息到伺服器
        //msg:要傳送的訊息物件
        //傳送協議:訊息長度(int) - 訊息頭(int) - 訊息主體(bytes)
        public void sendMessage(Message1 msg)
        {
            if (this.isConnection)
            {
                //先發送訊息長度
                this.client.Send(msg.byteSize,0,msg.byteSize.Length,0);
                //再發送訊息主體
                this.client.Send(msg.toBytes,0,msg.Size,0);
            }
        }
        //訊息接收
        public void reciveMessage()
        {
            try
            {
                //每次最多隻能接受100K的內容,如果是檔案或是視訊傳輸,請更改位元組大小
                byte[] bytes = new byte[1024 * 100];
                //每次從TCP協議中讀取出來的訊息標誌,>0,成功,0,-1:失敗或伺服器異常
                int read = 0;
                //每次讀取出來的訊息寫入到位元組陣列中的位置
                int readPos = 0;
                //儲存訊息頭是否讀完
                int headLeng = 0;
                //定義讀取超時
                client.ReceiveTimeout = 1000 * 60 * 10;
                //建立訊息執行緒
                messageQueue = MessageQueue.shareMessageQueue();
                while (this.isConnection)
                {
                    //如果訊息頭還沒有讀完,則一直讀取訊息頭,或是下一條訊息頭
                    if (headLeng == 0)
                    {
                        //一個一個位元組讀,防止網路丟包和粘包問題
                        read = client.Receive(bytes, readPos, 1, 0);
                        //訊息一個一個讀,每次都往位元組後寫一個位元組內容
                        readPos++;
                        if (read <= 0) break;//如果遠端伺服器中斷
                        if (readPos < 4) continue;//如果協議頭還沒有讀完 - 4個位元組
                        //讀協議頭
                        headLeng = System.BitConverter.ToInt32(bytes, 0);
                    }
                    else
                    {
                        //如果得到一條完整的訊息
                        if (readPos - 4 >= headLeng)
                        {
                            //建立一條訊息
                            Message1 msg = new Message1();
                            //將一條完整的訊息存入到Message物件中,去掉前面4個位元組的長度資訊。
                            msg.putBytes(bytes, 4, readPos);
                            //將訊息加入到訊息佇列
                            messageQueue.putMessage(msg);
                        //    MessageBox.Show("訊息");
                            //準備下一次讀                       
                            readPos = 0;
                            //此處將頭清0,便可讀取下一條訊息
                            headLeng = 0;
                            //將位元組清0,此方法速度高於new byte[],且節約記憶體
                            Array.Clear(bytes, 0, bytes.Length);
                        }
                        else
                        {
                            //如果協議頭還沒有讀完
                            headLeng = 0;
                        }
                    }
                }
                this.Close();
            }
            catch (Exception e){
                MessageBox.Show("伺服器已斷開!");
            }finally{
                this.Close();
            }
        }
        public void Close()
        {
            thread1.Abort();
            isConnection = false;
           // client.Close();
            //Console.WriteLine("連線關閉");
           // MethodDelegate.shareMethodDelegate().removeAllMethods();
            client.Dispose();
            client = null;
            _csocket = null;
        }
    }
    public delegate void callBack(Message1 msg);
    /*
     * 訊息委託處理類
     */
    public class MethodDelegate
    {
        private static MethodDelegate md = null;
        private Dictionary<int, callBack> methods = null;
        private Dictionary<int, Form> forms = null;
        private Form formRun;
        private MethodDelegate()
        {
            methods = new Dictionary<int, callBack>();
            forms = new Dictionary<int, Form>();
        }
        public static MethodDelegate shareMethodDelegate()
        {
            if (md == null) md = new MethodDelegate();
            return md;
        }
        public void addMethod(int type, callBack callFun, Form form = null)
        {
            if (this.methods.ContainsKey(type))
            {
                this.methods.Remove(type);
            }
            this.methods.Add(type, callFun);
            //視窗執行緒 - 子網路接收子執行緒不能更新UI介面,所以必須要有窗體反射執行。
            if (this.forms.ContainsKey(type))
            {
                this.forms.Remove(type);
            }
            this.forms.Add(type, form);
        }
        public void removeMethod(int type)
        {
            if (this.methods.ContainsKey(type))
            {
                this.methods.Remove(type);
            }
            if (this.forms.ContainsKey(type))
            {
                this.forms.Remove(type);
            }
        }
        public void Run(int type, Message1 msg)
        {
           // MessageBox.Show(type.ToString());
            if (!this.methods.ContainsKey(type)) return;
            callBack fun = this.methods[type];

            Form f = this.forms[type];
            if (f != null)
            {
                f.Invoke(fun, msg);
                return;
            }
            fun(msg);
        }
        public bool isClose = false;
        public void removeAllMethods()
        {
            this.methods.Clear();
            formRun = null;
            isClose = true;
            MessageQueue.shareMessageQueue().th.Abort();
        }
    }

    /*
     * 訊息處理佇列
     */
    class MessageQueue
    {
        //private Queue<Message> messageQueue = null;
        private ConcurrentQueue<Message1> messageQueue = null;
        private MethodDelegate methodRun = null;
        private static bool isSet = true;
        private static MessageQueue mqeue = null;
        private static AutoResetEvent evt = new AutoResetEvent(false); 
        public static MessageQueue shareMessageQueue()
        {
            if (mqeue == null) mqeue = new MessageQueue();
            return mqeue;
        } public Thread th;
        private MessageQueue() {
            messageQueue = new ConcurrentQueue<Message1>();
            methodRun = MethodDelegate.shareMethodDelegate();
            //建立一條處理訊息的執行緒
            th= new Thread(runMessage);
           th.Start();
          th.IsBackground = true;
        }

        public void putMessage(Message1 message)
        {
            messageQueue.Enqueue(message);
           // MessageBox.Show("新訊息");
            if (isSet)
            {
                isSet = false;
                evt.Set();                
            }
        }

        private void runMessage()
        {
            bool f = false;
            Message1 message = null;
            while (true)
            {
                f = messageQueue.TryDequeue(out message);
                if (f)
                {
                    //執行方法
                 //   MessageBox.Show("執行程式碼");
                    methodRun.Run(message.type,message);
                    //把訊息通知到U3D執行緒

                }
                if (messageQueue.Count == 0)
                {
                    isSet = true;
                    //佇列沒有訊息了,就等待...,直以有訊息進來,繼續執行
                   evt.WaitOne();
                }
            }
        }
    }

    /*
     *  U3D 訊息對映類 
     */
    class MessageCallBack 
    {
    }
}