1. 程式人生 > >[C#原始碼]網路資料流讀寫封裝類,支援多執行緒下同時讀和寫,自動資源管理,字串分隔符\r\n

[C#原始碼]網路資料流讀寫封裝類,支援多執行緒下同時讀和寫,自動資源管理,字串分隔符\r\n

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace EasyStudio.MessageHandler
{
    public class NetStreamBlock : IDisposable
    {
        private readonly MemoryStream _stream;
        private readonly BinaryReader _reader;
        private readonly BinaryWriter _writer;
        public int Length { get; private set; }
        public int ReadPosition { get; private set; }
        public int WritePosition { get; private set; }
        
        public NetStreamBlock(int blockSize)
        {
            Length = blockSize;
            ReadPosition = 0;
            WritePosition = 0;

            _stream = new MemoryStream(Length);
            _reader = new BinaryReader(_stream);
            _writer = new BinaryWriter(_stream);
        }

        public void Close()
        {
            lock (this)
            {
                _stream.Close();
                _reader.Close();
                _writer.Close();
            }
        }

        public void Dispose()
        {
            Close();
        }
        
        public int Read([In, Out] byte[] buffer, int offset, int size)
        {
            lock (this)
            {
                if (buffer == null)
                    throw new ArgumentNullException(nameof(buffer));
                if (offset < 0 || offset > buffer.Length)
                    throw new ArgumentOutOfRangeException(nameof(offset));
                if (size < 0 || size > buffer.Length - offset)
                    throw new ArgumentOutOfRangeException(nameof(size));

                _stream.Position = ReadPosition;
                int remainSize = Length - ReadPosition;
                if (remainSize == 0) return -1;

                if (size > remainSize) size = remainSize;
                int readSize = _reader.Read(buffer, offset, size);
                ReadPosition += readSize;
                return readSize;
            }
        }

        public int Write(byte[] buffer, int offset, int size)
        {
            lock (this)
            {
                if (buffer == null)
                    throw new ArgumentNullException(nameof(buffer));
                if (offset < 0 || offset > buffer.Length)
                    throw new ArgumentOutOfRangeException(nameof(offset));
                if (size < 0 || size > buffer.Length - offset)
                    throw new ArgumentOutOfRangeException(nameof(size));

                _stream.Position = WritePosition;
                int remainSize = Length - WritePosition;
                if (remainSize == 0) return -1;

                if (size > remainSize) size = remainSize;
                _writer.Write(buffer, offset, size);
                WritePosition += size;
                return size;
            }
        }

        public byte GetByte(int index)
        {
            lock (this)
            {
                return _stream.GetBuffer()[index];
            }
        }

        public byte[] Peek(int startIndex, int size)
        {
            lock (this)
            {
                if (startIndex < 0)
                    throw new ArgumentOutOfRangeException(nameof(startIndex));
                if (size < 0)
                    throw new ArgumentOutOfRangeException(nameof(size));

                int remainSize = Length - startIndex;
                if (remainSize <= 0)
                    return new byte[0];

                _stream.Position = startIndex;
                if (size > remainSize) size = remainSize;
                byte[] buffer = new byte[size];
                _reader.Read(buffer, 0, size);
                return buffer;
            }
        }

        public static IEnumerable<long> IndexesOf(byte[] source, int start, byte[] pattern)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (pattern == null)
            {
                throw new ArgumentNullException(nameof(pattern));
            }

            long valueLength = source.LongLength;
            long patternLength = pattern.LongLength;

            if ((valueLength == 0) || (patternLength == 0) || (patternLength > valueLength))
            {
                yield break;
            }

            var badCharacters = new long[256];

            for (var i = 0; i < 256; i++)
            {
                badCharacters[i] = patternLength;
            }

            var lastPatternByte = patternLength - 1;

            for (long i = 0; i < lastPatternByte; i++)
            {
                badCharacters[pattern[i]] = lastPatternByte - i;
            }

            long index = start;

            while (index <= valueLength - patternLength)
            {
                for (var i = lastPatternByte; source[index + i] == pattern[i]; i--)
                {
                    if (i == 0)
                    {
                        yield return index;
                        break;
                    }
                }

                index += badCharacters[source[index + lastPatternByte]];
            }
        }

        public int IndexOf(int startIndex, byte value)
        {
            if (startIndex < 0)
                throw new ArgumentOutOfRangeException(nameof(startIndex));

            lock (this)
            {
                byte[] source = _stream.GetBuffer();

                for (int i = startIndex; i < source.Length; i++)
                {
                    if (source[i] == value)
                        return i;
                }

                return -1;
            }
        }


        public int IndexOf(int startIndex, byte[] buffer, int offset, int size)
        {
            if (buffer == null)
                throw new ArgumentNullException(nameof(buffer));
            if (offset < 0 || offset > buffer.Length)
                throw new ArgumentOutOfRangeException(nameof(offset));
            if (size < 0 || size > buffer.Length - offset)
                throw new ArgumentOutOfRangeException(nameof(size));

            lock (this)
            {
                byte[] source =_stream.GetBuffer();

                IEnumerable<long> able = IndexesOf(source, startIndex, new ArraySegment<byte>(buffer, offset, size).ToArray());
                return (int)able.First();
            }
        }
    }

    public class NetStream : IDisposable
    {
        private readonly List<NetStreamBlock> _streams;
        public int BlockSize { get; set; } = 1024;
        private object _syncRoot;

        public NetStream()
        {
            _streams = new List<NetStreamBlock>();
            _syncRoot = ((ICollection)_streams).SyncRoot;
        }

        public void Close()
        {
            foreach (var netStreamBlock in _streams)
            {
                netStreamBlock.Dispose();
            }
        }

        public void Dispose()
        {
            Close();
        }
        
        //private long WritePosition => ;
        //private long ReadPosition => ;
        private long Length => _streams.Sum(block => block.Length);

        public void AutoExpand(int needSize)
        {
            lock (_syncRoot)
            {
                long length = Length, start = _streams.Sum(block => block.WritePosition);
                for (; start + needSize > length; length += BlockSize)
                {
                    _streams.Add(new NetStreamBlock(BlockSize));
                }
            }
        }

        public void AutoRelease()
        {
            lock (_syncRoot)
            {
                _streams.RemoveAll(block => { if (block.ReadPosition == block.Length) { block.Dispose(); return true; } return false; });
            }
        }
        
        public NetStream Write(byte[] buffer, int offset, int count, object syncObject)
        {
            if (buffer == null)
                throw new ArgumentNullException(nameof(buffer));
            if (offset < 0 || offset > buffer.Length)
                throw new ArgumentOutOfRangeException(nameof(offset));
            if (count < 0 || count > buffer.Length - offset)
                throw new ArgumentOutOfRangeException(nameof(count));

            lock (syncObject)
            {
                AutoExpand(count);

                for (int already = 0; already < count;)
                {
                    NetStreamBlock writeBlock = _streams.Find(block => block.WritePosition < block.Length);
                    int actual = writeBlock.Write(buffer, offset + already, count - already);
                    if (actual <= 0)
                        throw new Exception("write error");

                    already += actual;
                }

                return this;
            }
        }

        public NetStream Read([In, Out] byte[] buffer, int offset, int count, object syncObject) {
            if (buffer == null)
                throw new ArgumentNullException(nameof(buffer));
            if (offset < 0 || offset > buffer.Length)
                throw new ArgumentOutOfRangeException(nameof(offset));
            if (count < 0 || count > buffer.Length - offset)
                throw new ArgumentOutOfRangeException(nameof(count));

            lock (syncObject)
            {
                for (int already = 0; already < count;)
                {
                    NetStreamBlock readBlock = _streams.Find(block => block.ReadPosition < block.Length);
                    int actual = readBlock.Read(buffer, offset + already, count - already);
                    if (actual <= 0)
                        throw new Exception("read error");

                    already += actual;
                }

                AutoRelease();
                return this;
            }
        }

        private int Peek(int startPosition, [In, Out]byte[] buffer, object syncObject)
        {
            if (startPosition < 0)
                throw new ArgumentOutOfRangeException(nameof(startPosition));


            lock (syncObject)
            {
                int curPos = 0, writeCount = 0;
                foreach (var block in _streams)
                {
                    for (int blockIdx = 0; blockIdx < block.Length; blockIdx++)
                    {
                        int bufferIdx = curPos - startPosition;

                        if (bufferIdx >= 0)
                        {
                            if (bufferIdx < buffer.Length)
                            {
                                buffer[bufferIdx] = block.GetByte(blockIdx);
                                writeCount++;
                                if (writeCount == buffer.Length)
                                    return writeCount;
                            }
                        }

                        curPos++;
                    }
                }

                return writeCount;
            }
        }

        public int IndexOf(byte[] buffer, long startPosition)
        {
            if (buffer == null)
                throw new ArgumentNullException(nameof(buffer));

            lock (_syncRoot)
            {
                byte[] peekBytes = new byte[buffer.Length];
                long length = Length;
                for (long i = startPosition; i < length; i++)
                {
                    int count = Peek((int)i, peekBytes, this);
                    if (count == buffer.Length)
                    {
                        if (buffer.SequenceEqual(peekBytes))
                        {
                            return (int)i;
                        }
                    }
                    else
                    {
                        return -1;
                    }
                }

                return -1;
            }
        }

        public NetStream WriteString(string value, Encoding encoding) {
            byte[] bytes = encoding.GetBytes(value + "\r\n");
            return Write(bytes, 0, bytes.Length, _syncRoot); ;
        }

        public string ReadString(Encoding encoding)
        {
            lock (_syncRoot)
            {
                long startPosition = _streams.Sum(block => block.ReadPosition);


                byte[] findBytes = encoding.GetBytes("\r\n");
                int idx = IndexOf(findBytes, startPosition);
                if (idx == -1)
                    return string.Empty;

                byte[] buffer = new byte[idx - startPosition + findBytes.Length];
                Read(buffer, 0, buffer.Length, this);

                return encoding.GetString(buffer);
            }
        }
    }
}