[C#原始碼]網路資料流讀寫封裝類,支援多執行緒下同時讀和寫,自動資源管理,字串分隔符\r\n
阿新 • • 發佈:2018-12-15
using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net.WebSockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace EasyStudio.MessageHandler { public class NetStreamBlock : IDisposable { private readonly MemoryStream _stream; private readonly BinaryReader _reader; private readonly BinaryWriter _writer; public int Length { get; private set; } public int ReadPosition { get; private set; } public int WritePosition { get; private set; } public NetStreamBlock(int blockSize) { Length = blockSize; ReadPosition = 0; WritePosition = 0; _stream = new MemoryStream(Length); _reader = new BinaryReader(_stream); _writer = new BinaryWriter(_stream); } public void Close() { lock (this) { _stream.Close(); _reader.Close(); _writer.Close(); } } public void Dispose() { Close(); } public int Read([In, Out] byte[] buffer, int offset, int size) { lock (this) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0 || offset > buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset)); if (size < 0 || size > buffer.Length - offset) throw new ArgumentOutOfRangeException(nameof(size)); _stream.Position = ReadPosition; int remainSize = Length - ReadPosition; if (remainSize == 0) return -1; if (size > remainSize) size = remainSize; int readSize = _reader.Read(buffer, offset, size); ReadPosition += readSize; return readSize; } } public int Write(byte[] buffer, int offset, int size) { lock (this) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0 || offset > buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset)); if (size < 0 || size > buffer.Length - offset) throw new ArgumentOutOfRangeException(nameof(size)); _stream.Position = WritePosition; int remainSize = Length - WritePosition; if (remainSize == 0) return -1; if (size > remainSize) size = remainSize; _writer.Write(buffer, offset, size); WritePosition += size; return size; } } public byte GetByte(int index) { lock (this) { return _stream.GetBuffer()[index]; } } public byte[] Peek(int startIndex, int size) { lock (this) { if (startIndex < 0) throw new ArgumentOutOfRangeException(nameof(startIndex)); if (size < 0) throw new ArgumentOutOfRangeException(nameof(size)); int remainSize = Length - startIndex; if (remainSize <= 0) return new byte[0]; _stream.Position = startIndex; if (size > remainSize) size = remainSize; byte[] buffer = new byte[size]; _reader.Read(buffer, 0, size); return buffer; } } public static IEnumerable<long> IndexesOf(byte[] source, int start, byte[] pattern) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (pattern == null) { throw new ArgumentNullException(nameof(pattern)); } long valueLength = source.LongLength; long patternLength = pattern.LongLength; if ((valueLength == 0) || (patternLength == 0) || (patternLength > valueLength)) { yield break; } var badCharacters = new long[256]; for (var i = 0; i < 256; i++) { badCharacters[i] = patternLength; } var lastPatternByte = patternLength - 1; for (long i = 0; i < lastPatternByte; i++) { badCharacters[pattern[i]] = lastPatternByte - i; } long index = start; while (index <= valueLength - patternLength) { for (var i = lastPatternByte; source[index + i] == pattern[i]; i--) { if (i == 0) { yield return index; break; } } index += badCharacters[source[index + lastPatternByte]]; } } public int IndexOf(int startIndex, byte value) { if (startIndex < 0) throw new ArgumentOutOfRangeException(nameof(startIndex)); lock (this) { byte[] source = _stream.GetBuffer(); for (int i = startIndex; i < source.Length; i++) { if (source[i] == value) return i; } return -1; } } public int IndexOf(int startIndex, byte[] buffer, int offset, int size) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0 || offset > buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset)); if (size < 0 || size > buffer.Length - offset) throw new ArgumentOutOfRangeException(nameof(size)); lock (this) { byte[] source =_stream.GetBuffer(); IEnumerable<long> able = IndexesOf(source, startIndex, new ArraySegment<byte>(buffer, offset, size).ToArray()); return (int)able.First(); } } } public class NetStream : IDisposable { private readonly List<NetStreamBlock> _streams; public int BlockSize { get; set; } = 1024; private object _syncRoot; public NetStream() { _streams = new List<NetStreamBlock>(); _syncRoot = ((ICollection)_streams).SyncRoot; } public void Close() { foreach (var netStreamBlock in _streams) { netStreamBlock.Dispose(); } } public void Dispose() { Close(); } //private long WritePosition => ; //private long ReadPosition => ; private long Length => _streams.Sum(block => block.Length); public void AutoExpand(int needSize) { lock (_syncRoot) { long length = Length, start = _streams.Sum(block => block.WritePosition); for (; start + needSize > length; length += BlockSize) { _streams.Add(new NetStreamBlock(BlockSize)); } } } public void AutoRelease() { lock (_syncRoot) { _streams.RemoveAll(block => { if (block.ReadPosition == block.Length) { block.Dispose(); return true; } return false; }); } } public NetStream Write(byte[] buffer, int offset, int count, object syncObject) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0 || offset > buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0 || count > buffer.Length - offset) throw new ArgumentOutOfRangeException(nameof(count)); lock (syncObject) { AutoExpand(count); for (int already = 0; already < count;) { NetStreamBlock writeBlock = _streams.Find(block => block.WritePosition < block.Length); int actual = writeBlock.Write(buffer, offset + already, count - already); if (actual <= 0) throw new Exception("write error"); already += actual; } return this; } } public NetStream Read([In, Out] byte[] buffer, int offset, int count, object syncObject) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0 || offset > buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0 || count > buffer.Length - offset) throw new ArgumentOutOfRangeException(nameof(count)); lock (syncObject) { for (int already = 0; already < count;) { NetStreamBlock readBlock = _streams.Find(block => block.ReadPosition < block.Length); int actual = readBlock.Read(buffer, offset + already, count - already); if (actual <= 0) throw new Exception("read error"); already += actual; } AutoRelease(); return this; } } private int Peek(int startPosition, [In, Out]byte[] buffer, object syncObject) { if (startPosition < 0) throw new ArgumentOutOfRangeException(nameof(startPosition)); lock (syncObject) { int curPos = 0, writeCount = 0; foreach (var block in _streams) { for (int blockIdx = 0; blockIdx < block.Length; blockIdx++) { int bufferIdx = curPos - startPosition; if (bufferIdx >= 0) { if (bufferIdx < buffer.Length) { buffer[bufferIdx] = block.GetByte(blockIdx); writeCount++; if (writeCount == buffer.Length) return writeCount; } } curPos++; } } return writeCount; } } public int IndexOf(byte[] buffer, long startPosition) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); lock (_syncRoot) { byte[] peekBytes = new byte[buffer.Length]; long length = Length; for (long i = startPosition; i < length; i++) { int count = Peek((int)i, peekBytes, this); if (count == buffer.Length) { if (buffer.SequenceEqual(peekBytes)) { return (int)i; } } else { return -1; } } return -1; } } public NetStream WriteString(string value, Encoding encoding) { byte[] bytes = encoding.GetBytes(value + "\r\n"); return Write(bytes, 0, bytes.Length, _syncRoot); ; } public string ReadString(Encoding encoding) { lock (_syncRoot) { long startPosition = _streams.Sum(block => block.ReadPosition); byte[] findBytes = encoding.GetBytes("\r\n"); int idx = IndexOf(findBytes, startPosition); if (idx == -1) return string.Empty; byte[] buffer = new byte[idx - startPosition + findBytes.Length]; Read(buffer, 0, buffer.Length, this); return encoding.GetString(buffer); } } } }