• 前言

大齡程式設計師失業狀態,前幾天面試了一家與醫療裝置為主的公司並錄取;因該單位涉及串列埠通訊方面技術,自己曾做過通訊相關的一些專案,涉及Socket的較多,也使用SuperSocket做過一些專案,入職前做了一些準備工作,按照SuperSocket設計思路寫了一套串列埠通訊的框架,最後入職後發現曾一再確定的雙休問題不能實現;技術負責人對我的工作安排只是跟著幾個年輕程式設計師熟悉及維護已有程式,入職前所說的某一個新專案也已經有人在做,而且翻看他們的程式碼質量也很一般,,綜合考慮還是放棄了這份工作,5年前開始一直都在技術管理崗位上,實在不能接受連工作安排都沒具體計劃的領導;辭職後再把這個套東西整理了一下,做個開源專案釋出吧,同時也希望有更好的串列埠解析框架歡迎告知我來學習。

  • 專案介紹

專案名稱為 ZhCun.SerialIO

一款串列埠通訊框架,更容易的處理協議解析,內部實現了粘包、分包、冗餘錯誤包的處理實現; 它可更方便的業務邏輯的處理,將通訊、協議解析、業務邏輯 完全獨立開來,更容易擴充套件和修改程式碼邏輯; 本專案參考了SuperSocket中協議解析的設計原理,可外部 命令(Command)類, 對應協議中命令字靈活實現邏輯。

例如: 協議格式:[2位元組 固定頭 0xaa,0xbb] + [1位元組 長度 len] [1位元組 命令字] [1位元組 裝置號] [N位元組 data] [2位元組 CRC校驗碼]

命令資料: AA BB 09 01 05 00 01 2B 56

可以處理以下幾種(粘包、分包、容錯)情況:

1.  AA BB 09 01 05 00 01 2B 56 AA BB 09 01 05 00 01 2B 56 AA BB 09 01 05 00 01  可以不考慮粘包的處理,會分成3條協議交給Command來處理(後面說明)

2. 00 00 00 AA BB 09 01 05 00 01 2B 56 00 00 00  標記為紅色的為錯誤資料,這些資料會被自動過濾掉

3.  連續收到(延時接收了)多個半包,串列埠快取問題可能導致延時收到資料

AA BB 09 01 05

00 01 2B 56

      這種情況會等待下次處理,如果之後再沒有收到正確的資料會丟棄前部分,後面正確的資料會正常處理

程式碼目錄:

  •  設計思路及實現

ISerialServer 是實現串列埠通訊的介面,接收資料、傳送資料、開啟與關閉串列埠的實現;SerialCore 是 ISerialServer 通訊的核心實現

public interface ISerialServer : IDisposable
{
/// <summary>
/// 當連線狀態改變時觸發
/// </summary>
event Action<ConnectChangedEvent> ConnectChanged;
/// <summary>
/// 當讀資料完成時觸發
/// </summary>
event Action<ReadWriteEvent> DataReadOver;
/// <summary>
/// 當下發資料時觸發
/// </summary>
event Action<ReadWriteEvent> DataWriteOver;
/// <summary>
/// 當前服務的配置
/// </summary>
SerialOption Option { get; } void Write(byte[] data); void Write(byte[] data, int offset, int count); void Write(byte[] data, int offset, int count, int sendTimes, int interval); void Write(IWriteModel model); /// <summary>
/// 開始監聽串列埠
/// </summary>
void Start(SerialOption option);
/// <summary>
/// 預設引數及指定串列埠開始服務
/// </summary>
void Start(string portName, int baudRate = 9600);
}

  SerialServerBase 繼承了 SerialCore ,它主要實現了 命令處理器 及 過濾處理器,這套框架的核心就是協議的解析及命令的處理;

它擴充套件了兩個重要屬性 :Filters 和 Commands 分別是協議過濾器和命令處理器,與 SerialCore 分開是為了滿足不需要過濾器和命令處理器的情況

建構函式中呼叫載入過濾器 LoadFilters() 與 載入命令處理器 的兩個方法,該方法應該由應用程式來實現子類並加入使用者自定義的 Filters 和 Commands

       public SerialServerBase()
{
Filters = new List<IReceiveFilter>();
Commands = new List<ICommand>();
LoadFilters();
LoadCommands();
Filters.ForEach(s => s.OnFilterFinish = ReceiveFilterAction);
}
/// <summary>
/// 過濾器
/// </summary>
protected List<IReceiveFilter> Filters { get; }
/// <summary>
/// 命令處理器
/// </summary>
protected List<ICommand> Commands { get; }
/// <summary>
/// 載入過濾器,子類需 Filters.Add
/// </summary>
protected virtual void LoadFilters() { }
/// <summary>
/// 載入命令處理器
/// </summary>
protected virtual void LoadCommands() { }

SerialServerBase 重寫了 OnDataReadOver 和 ReceiveFilterAction,分別來處理協議解析和命令處理

       /// <summary>
/// 接收到資料後交給過濾器來處理協議
/// </summary>
protected override void OnDataReadOver(byte[] data, int offset, int count)
{
foreach (var filter in Filters)
{
filter.Filter(this, data, offset, count, false, out _);
} base.OnDataReadOver(data, offset, count);
}
/// <summary>
/// 接收資料解析完成後觸發
/// </summary>
protected virtual void ReceiveFilterAction(PackageInfo package)
{
if (Commands == null || Commands.Count == 0) return; var cmd = Commands.Find(s => s.Key == package.Key);
if (cmd != null)
{
cmd.Execute(this, package);
}
}

  IReceiveFilter 接收過濾器定義,它實現解析的核心功能,處理粘包、分包都是在 它的實現類 ReceiveBaseFilter 中,Filter 方法實現分包粘包的處理,程式碼如下

        /// <summary>
/// 過濾協議,粘包、分包的處理
/// </summary>
public virtual void Filter(IBufferReceive recBuffer, byte[] data, int offset, int count, bool isBuffer, out int rest)
{
if (!isBuffer && recBuffer.HasReceiveBuffer())
{
recBuffer.SetReceiveBuffer(data, offset, count);
Filter(recBuffer, recBuffer.ReceiveBuffer, 0, recBuffer.ReceiveOffset, true, out rest);
return;
} if (isBuffer && count < MinLength)
{
rest = 0;
return;
} if (!isBuffer && recBuffer.ReceiveOffset + count < MinLength)
{
//等下一次接收後處理
recBuffer.SetReceiveBuffer(data, offset, count);
rest = 0;
return;
} rest = 0; if (!FindHead(data, offset, count, out int headOffset))
{
//未找到包頭丟棄
recBuffer.RestReceiveBuffer();
FilterFinish(1, data, offset, count);
return;
} if (count - (headOffset - offset) < MinLength)
{
// 從包頭位置小於最小長度(半包情況),注意:解析了一半,不做解析完成處理
recBuffer.SetReceiveBuffer(data, headOffset, count - (headOffset - offset));
return;
} int dataLen = GetDataLength(data, headOffset, count - (headOffset - offset));
if (dataLen <= 0)
{
//錯誤的長度 丟棄
recBuffer.RestReceiveBuffer();
FilterFinish(2, data, offset, count);
return;
} if (dataLen > count - (headOffset - offset))
{
//半(分)包情況,等下次接收後合併
if (!isBuffer) recBuffer.SetReceiveBuffer(data, headOffset, count - (headOffset - offset));
return;
} rest = count - (dataLen + (headOffset - offset)); FilterFinish(0, data, headOffset, dataLen); recBuffer.RestReceiveBuffer(); if (rest > 0)
{
Filter(recBuffer, data, headOffset + dataLen, rest, false, out rest);
return;
}
}

核心解析先介紹這麼多,下面舉例說明下如何應用及使用過程

以上介紹示例的協議來舉例

協議說明: [2位元組 固定頭 0xaa,0xbb] + [1位元組 長度 len] [1位元組 命令字] [1位元組 裝置號] [N位元組 data] [2位元組 CRC校驗碼]

步驟:

  1. 建立過濾器 ,應用層的過濾器只需要設定 包頭,獲取資料包長度、命令字的實現,簡單幾行程式碼即可方便實現過濾的整個過程;

程式碼如下:

    public class FHDemoFilter : FixedHeadFilter
{
static byte[] Head = new byte[] { 0xaa, 0xbb }; public FHDemoFilter()
: base(Head, 6)
{ } //[0xaa,0xbb] [len] [cmd] [DevId] [data] [crc-1,crc-2] protected override int GetDataLength(byte[] data, int offset, int count)
{
//資料包長度 第3個位元組
return data[offset + 2];
} protected override int GetPackageKey(byte[] data, int offset, int count)
{
//命令字 第4個位元組
return data[offset + 3];
}
}

  2. 建立命令處理器 ,命令處理器 由 ICommand 派生,需要指明 Key (即:GetPackageKey 獲取的命令字),然後一個 執行的邏輯方法 Execute ,這裡將 接收到的資料包與傳送的資料包封裝了實體物件,更方便處理data的解析及傳送包的封裝;

   定義一個抽象的 CmdBase 它的派生類來實現具體 命令 的業務邏輯,CmdBase 會將協議生成一個實體物件給派生類

   public abstract class CmdBase<TReadModel> : ICommand
where TReadModel : R_Base, new()
{
public abstract int Key { get; } public abstract string CmdName { get; } /// <summary>
/// 執行響應邏輯
/// </summary>
public abstract void ExecuteHandle(ISerialServer server, TReadModel rep); public virtual void Execute(ISerialServer server, PackageInfo package)
{
var rModel = new TReadModel();
var r = rModel.Analy(package.Body, package.BodyOffset, package.BodyCount);
if (r == 0)
{
ExecuteHandle(server, rModel);
}
else
{
LogPrint.Print($"解析key={Key} 異常,error code: {r}");
}
}
}

CmdBase 將建立的 R_Base 例項 交給派生類處理,R_Base 封裝瞭解析資料包內容及校驗的實現,派生類只需要將 Data 資料再次解析即可

R_Base 使用了 BytesAnalyHelper 位元組解析工具,它能更方便和靈活的來按順序解析協議;

    public abstract class R_Base : BaseProtocol
{
/// <summary>
/// 解析資料物件
/// </summary>
protected BytesAnalyHelper AnalyObj { get; private set; }
/// <summary>
/// 解析訊息體, 0 正常, 1 校驗碼 錯誤,2 解析異常
/// </summary>
protected abstract int AnalyBody(BytesAnalyHelper analy); /// <summary>
/// 解析協議, 0 成功 1 校驗碼錯誤 9 異常
/// </summary>
public int Analy(byte[] data, int offset, int count)
{
try
{
var crc = GetCRC(data, offset, count - 2); //校驗碼方法內去除
var crcBytes = BitConverter.GetBytes(crc);
if (crcBytes[0] != data[offset + count - 1] || crcBytes[1] != data[offset + count - 2])
{
return 1;
}
//[0xaa,0xbb] [len] [cmd] [DevId] [data] [crc-1,crc-2]
AnalyObj = new BytesAnalyHelper(data, offset, count, 4); // 跳過包頭部分
DevId = AnalyObj.GetByte(); // 取 DevId
var r = AnalyBody(AnalyObj);
return r;
}
catch
{
//LogHelper.LogObj.Error($"解析資料包發生異常.", ex);
return 9;
}
}
}

舉例解析 data 為一個文字的實現,只需要用哪種編碼轉換即可,直接賦值給 Text

   public class R_Text : R_Base
{
public string Text { set; get; } protected override int AnalyBody(BytesAnalyHelper analy)
{
Text = analy.GetString(analy.NotAnalyCount - 2);
return 0;
}
}

,然後CmdText 的命令處理器就可以得到這個物件,來進行對應的業務邏輯處理

    public class CmdText : CmdBase<R_Text>
{
public override int Key => 0x02; public override string CmdName => "Text"; public override void ExecuteHandle(ISerialServer server, R_Text rep)
{
LogPrint.Print($"[Text]: {rep.Text}");
//回覆訊息
var w = new W_TextRe();
w.DevId = rep.DevId;
server.Write(w); //.. to do something
}
}

以上就是實現的主要部分,最終呼叫  SerialServer 派生例項的 Start 方法即可;

最後附上,demo實現截圖

  • 結束語

  這個框架不太複雜,步驟有一些繁瑣,但程式碼量很少,共享出來也希望給需要的人一些思路,同時也希望能提出一些建議,能更好的改進;

看到這裡如果有 年齡 35+ 的程式設計師,歡迎交流一下都在做什麼?

程式碼已託管至:gitee