1. 程式人生 > >.Net Core下使用RabbitMQ比較完備的兩種方案(雖然程式碼有點慘淡,不過我會完善)

.Net Core下使用RabbitMQ比較完備的兩種方案(雖然程式碼有點慘淡,不過我會完善)

一、前言

    上篇說給大家來寫C#和Java的方案,最近工作也比較忙,遲到了一些,我先給大家補上C#的方案。

二、使用的外掛

    HangFire

    一個開源的.NET任務排程框架,最大特點在於內建提供整合化的控制檯,方便後臺檢視及監控,支援多種儲存方式;在方案中主要使用定時任務做補償機制,後期可能會封裝一些,能通過頁面的形式直接新增任務;

   NLog

   日誌記錄框架,方案中使用記錄日誌,後期可能回整合多個日誌框架;

   Autofac

   依賴注入的框架,應該不用做過多介紹;

  SqlSugar

  ORM框架,這個從剛開始我就在使用了,在現在公司沒有推行起來,不過在上兩家公司都留下的遺產,據說還用的可以,當然我還是最佩服作者;

  Polly

  容錯服務框架,類似於Java下的Hystrix,主要是為了解決分散式系統中,系統之間相互依賴,可能會因為多種因素導致服務不可用的而產生的一套框架,支援服務的超時重試、限流、熔斷器等等;

  RabbitMQ.Client

  官方提供的C#連線RabbitMQ的SDK;

三、方案

  模擬一個簡單訂單下單的場景,沒有進行具體的實現。同時建議下游服務不要寫在web端,最好以服務的形式奔跑,程式碼中是Web端實現的,大家不要這麼搞。整體上還是實現了之前提到的兩種方案:一是入庫打標,二是延時佇列(這塊沒有進行很好的測試,但是估計也沒有很大的問題);當然也是有一些特點:RabbitMQ宕機情況下無需重啟服務,網路異常的情況下也可以進行斷線重連。接下來聊下程式碼和各方外掛在系統中的具體應用:

  專案結構:

  

  RabbitMQExtensions:

  採用Autofac按照單例的形式注入,採用Polly進行斷線重連,也開啟了自身斷線重連和心跳檢測機制,配置方面採用最簡單的URI規範進行配置,有興趣參考下官方,整體上這塊程式碼還相對比較規範,以後可能也不會有太多調整;

  1     /// <summary>
  2     /// rabbitmq持久化連線
  3     /// </summary>
  4     public interface IRabbitMQPersistentConnection
  5     {
  6         bool IsConnected { get; }
  7 
  8         bool TryConnect();
  9 
 10         IModel CreateModel();
 11     }
 12     /// <summary>
 13     /// rabbitmq持久化連線實現
 14     /// </summary>
 15     public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection
 16     {
 17         private readonly IConnectionFactory connectionFactory;
 18         private readonly ILogger<DefaultRabbitMQPersistentConnection> logger;
 19 
 20         private IConnection connection;
 21 
 22         private const int RETTRYCOUNT = 6;
 23 
 24         private static readonly object lockObj = new object();
 25         public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger)
 26         {
 27             this.connectionFactory = connectionFactory;
 28             this.logger = logger;
 29         }
 30 
 31         public bool IsConnected
 32         {
 33             get
 34             {
 35                 return connection != null && connection.IsOpen;
 36             }
 37         }
 38 
 39         public void Cleanup()
 40         {
 41             try
 42             {
 43                 connection.Dispose();
 44                 connection.Close();
 45                 connection = null;
 46 
 47             }
 48             catch (IOException ex)
 49             {
 50                 logger.LogCritical(ex.ToString());
 51             }
 52         }
 53 
 54         public IModel CreateModel()
 55         {
 56             if (!IsConnected)
 57             {
 58                 connection.Close();
 59                 throw new InvalidOperationException("連線不到rabbitmq");
 60             }
 61             return connection.CreateModel();
 62         }
 63 
 64         public bool TryConnect()
 65         {
 66             logger.LogInformation("RabbitMQ客戶端嘗試連線");
 67 
 68             lock (lockObj)
 69             {
 70                 if (connection == null)
 71                 {
 72                     var policy = RetryPolicy.Handle<SocketException>()
 73                         .Or<BrokerUnreachableException>()
 74                         .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>    
 75                         {
 76                             logger.LogWarning(ex.ToString());
 77                         });
 78 
 79                     policy.Execute(() =>
 80                     {
 81                         connection = connectionFactory.CreateConnection();
 82                     });
 83                 }
 84 
 85 
 86 
 87                 if (IsConnected)
 88                 {
 89                     connection.ConnectionShutdown += OnConnectionShutdown;
 90                     connection.CallbackException += OnCallbackException;
 91                     connection.ConnectionBlocked += OnConnectionBlocked;
 92 
 93                     logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}獲取了連線");
 94 
 95                     return true;
 96                 }
 97                 else
 98                 {
 99                     logger.LogCritical("無法建立和開啟RabbitMQ連線");
100 
101                     return false;
102                 }
103             }
104         }
105 
106 
107         private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
108         {
109 
110             logger.LogWarning("RabbitMQ連線異常,嘗試重連...");
111 
112             Cleanup();
113             TryConnect();
114         }
115 
116         private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
117         {
118 
119             logger.LogWarning("RabbitMQ連線異常,嘗試重連...");
120 
121             Cleanup();
122             TryConnect();
123         }
124 
125         private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
126         {
127 
128             logger.LogWarning("RabbitMQ連線異常,嘗試重連...");
129 
130             Cleanup();
131             TryConnect();
132         }
133     }
View Code

  OrderDal

  SqlSugar的一些簡單封裝,有些小特點:大家可以可以通過配置來實現讀寫分離,採用倉儲設計。如果不太喜歡這麼寫,也可以參考傑哥的做法

    public interface IBaseDal<T> where T:class,new()
    {
        DbSqlSugarClient DbContext { get; }

        IBaseDal<T> UserDb(string dbName);
        IInsertable<T> AsInsertable(T t);
        IInsertable<T> AsInsertable(T[] t);
        IInsertable<T> AsInsertable(List<T> t);
        IUpdateable<T> AsUpdateable(T t);
        IUpdateable<T> AsUpdateable(T[] t);
        IUpdateable<T> AsUpdateable(List<T> t);
        IDeleteable<T> AsDeleteable();

        List<T> GetList();
        Task<List<T>> GetListAnsync();

        List<T> GetList(Expression<Func<T,bool>> whereExpression);
        Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression);

        List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc);
        Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc);

        List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page);
        Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page);

        List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc);
        Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc);

        int Count(Expression<Func<T, bool>> whereExpression);
        Task<int> CountAsync(Expression<Func<T, bool>> whereExpression);
        T GetById(dynamic id);
        T GetSingle(Expression<Func<T, bool>> whereExpression);
        Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression);
        T GetFirst(Expression<Func<T, bool>> whereExpression);
        Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression);

        bool IsAny(Expression<Func<T, bool>> whereExpression);
        Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression);

        bool Insert(T t);
        Task<bool> InsertAsync(T t);
        bool InsertRange(List<T> t);
        Task<bool> InsertRangeAsync(List<T> t);
        bool InsertRange(T[] t);
        Task<bool> InsertRangeAsync(T[] t);
        int InsertReturnIdentity(T t);
        Task<long> InsertReturnIdentityAsync(T t);


        bool Delete(Expression<Func<T, bool>> whereExpression);
        Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression);
        bool Delete(T t);
        Task<bool> DeleteAsync(T t);
        bool DeleteById(dynamic id);
        Task<bool> DeleteByIdAsync(dynamic id);
        bool DeleteByIds(dynamic[] ids);
        Task<bool> DeleteByIdsAsync(dynamic[] ids);


        bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression);
        Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression);
        bool Update(T t);
        Task<bool> UpdateAsync(T t);
        bool UpdateRange(T[] t);
        Task<bool> UpdateRangeAsync(T[] t);


        void BeginTran();
        void CommitTran();
        void RollbackTran();
    }

   public class BaseDal<T> : IBaseDal<T> where T : class, new()
    {
        private readonly IEnumerable<DbSqlSugarClient> clients;
        public BaseDal(IEnumerable<DbSqlSugarClient> clients)
        {
            this.clients = clients;
            DbContext = this.clients.FirstOrDefault(x => x.Default);
        }
        public DbSqlSugarClient DbContext { get; set; }

        public IDeleteable<T> AsDeleteable()
        {
            return DbContext.Deleteable<T>();
        }

        public IInsertable<T> AsInsertable(T t)
        {
            return DbContext.Insertable<T>(t);
        }

        public IInsertable<T> AsInsertable(T[] t)
        {
            return DbContext.Insertable<T>(t);
        }

        public IInsertable<T> AsInsertable(List<T> t)
        {
            return DbContext.Insertable<T>(t);
        }

        public IUpdateable<T> AsUpdateable(T t)
        {
            return DbContext.Updateable<T>(t);
        }

        public IUpdateable<T> AsUpdateable(T[] t)
        {
            return DbContext.Updateable<T>(t);
        }

        public IUpdateable<T> AsUpdateable(List<T> t)
        {
            return DbContext.Updateable(t);
        }

        public void BeginTran()
        {
            DbContext.Ado.BeginTran();
        }

        public void CommitTran()
        {
            DbContext.Ado.CommitTran();
        }

        public int Count(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Count(whereExpression);
        }

        public Task<int> CountAsync(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().CountAsync(whereExpression);
        }

        public bool Delete(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommand() > 0;
        }

        public bool Delete(T t)
        {
            return DbContext.Deleteable<T>().ExecuteCommand() > 0;
        }

        public async Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommandAsync() > 0;
        }

        public async Task<bool> DeleteAsync(T t)
        {
            return await DbContext.Deleteable(t).ExecuteCommandAsync() > 0;
        }

        public bool DeleteById(dynamic id)
        {
            return DbContext.Deleteable<T>().In(id).ExecuteCommand() > 0;
        }

        public async Task<bool> DeleteByIdAsync(dynamic id)
        {
            return await DbContext.Deleteable<T>().In(id).ExecuteCommandAsync() > 0;
        }

        public bool DeleteByIds(dynamic[] ids)
        {
            return DbContext.Deleteable<T>().In(ids).ExecuteCommand() > 0;
        }

        public async Task<bool> DeleteByIdsAsync(dynamic[] ids)
        {
            return await DbContext.Deleteable<T>().In(ids).ExecuteCommandAsync() > 0;
        }

        public T GetById(dynamic id)
        {
            return DbContext.Queryable<T>().InSingle(id);
        }

        public T GetFirst(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().First(whereExpression);
        }

        public async Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().FirstAsync(whereExpression);
        }

        public List<T> GetList()
        {
            return DbContext.Queryable<T>().ToList();
        }

        public List<T> GetList(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Where(whereExpression).ToList();
        }

        public List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc)
        {
            return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToList();
        }

        public async Task<List<T>> GetListAnsync()
        {
            return await DbContext.Queryable<T>().ToListAsync();
        }

        public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).ToListAsync();
        }

        public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToListAsync();
        }

        public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page)
        {
            return DbContext.Queryable<T>().Where(whereExpression).ToPageList(page.PageIndex,page.PageSize);
        }

        public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc)
        {
           return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageList(page.PageIndex, page.PageSize);
        }

        public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize);
        }

        public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize);
        }

        public T GetSingle(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Single(whereExpression);
        }

        public async Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().SingleAsync(whereExpression);
        }

        public bool Insert(T t)
        {
            return DbContext.Insertable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> InsertAsync(T t)
        {
            return await DbContext.Insertable(t).ExecuteCommandAsync() > 0;
        }

        public bool InsertRange(List<T> t)
        {
            return DbContext.Insertable(t).ExecuteCommand() > 0;
        }

        public bool InsertRange(T[] t)
        {
            return DbContext.Insertable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> InsertRangeAsync(List<T> t)
        {
            return await DbContext.Insertable(t).ExecuteCommandAsync() > 0;
        }

        public async Task<bool> InsertRangeAsync(T[] t)
        {
            return await DbContext.Insertable(t).ExecuteCommandAsync() > 0;
        }

        public int InsertReturnIdentity(T t)
        {
            return DbContext.Insertable(t).ExecuteReturnIdentity();
        }

        public async Task<long> InsertReturnIdentityAsync(T t)
        {
            return await DbContext.Insertable(t).ExecuteReturnBigIdentityAsync();
        }

        public bool IsAny(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Any(whereExpression);
        }

        public async Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().AnyAsync(whereExpression);
        }

        public void RollbackTran()
        {
            DbContext.Ado.RollbackTran();
        }

        public bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommand() > 0;
        }

        public bool Update(T t)
        {
            return DbContext.Updateable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommandAsync() > 0;
        }

        public async Task<bool> UpdateAsync(T t)
        {
            return await DbContext.Updateable(t).ExecuteCommandAsync() > 0;
        }

        public bool UpdateRange(T[] t)
        {
            return DbContext.Updateable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> UpdateRangeAsync(T[] t)
        {
            return await DbContext.Updateable(t).ExecuteCommandAsync() > 0;
        }

        public IBaseDal<T> UserDb(string dbName)
        {
            DbContext = this.clients.FirstOrDefault(it => it.DbName == dbName);
            return this;
        }
    }
View Code

  OrderCommon

  定義全域性異常的中介軟體,還有包含一些用到的實體等等,這部分程式碼還可優化拆分一下;

  OrderService

  生產者和消費者的具體實現,這塊我還想在改造一番,將消費和業務分割開,現在寫的很凌亂,不建議這麼寫,先把程式碼放出來,看看大家贊同不贊同我的這些用法,可以討論,也歡迎爭論,雖然這塊程式碼寫的不好,但是其實裡面涉及一些RabbitMQ回撥函式的用法,也是比較重要的,沒有這些函式也就實現不了我上面說那兩個特點;

//RabbitMQ宕機以後回撥
//客戶端這塊大家不要採用遞迴呼叫恢復連結
//具體為什麼大家可以測試下,這裡留點小疑問哈哈
connection.ConnectionShutdown += OnConnectionShutdown;

//消費端異常以後回撥
consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException;

  Order

  具體的呼叫者,大家應該根據方法名字就能區分出我上面提到的兩種方案的設計;

  HangfireExtensions

  Hangfire定時框架,採用Mysql作為持久層的儲存,寫的也比較清晰,後期就是針對這些進行擴充套件,實現在介面就能新增定時任務;

四、結束

  生產端和消費端這段程式碼寫的凌亂,希望大家不要介意這一點,是有原因的,這裡我就不說了。希望大家看到閃光點,不要在一點上糾結;下次會加入Elasticsearch和監控部分的時候我會把這塊程式碼改掉,還大家一片整潔的世界;

  Github地址:https://github.com/wangtongzhou520/rabbitmq.git  有什麼問題大家可以問我;

  歡迎大家加群438836709!歡迎大家關注我!

  

&n