.Net Core下使用RabbitMQ比較完備的兩種方案(雖然程式碼有點慘淡,不過我會完善)
一、前言
上篇說給大家來寫C#和Java的方案,最近工作也比較忙,遲到了一些,我先給大家補上C#的方案。
二、使用的外掛
HangFire
一個開源的.NET任務排程框架,最大特點在於內建提供整合化的控制檯,方便後臺檢視及監控,支援多種儲存方式;在方案中主要使用定時任務做補償機制,後期可能會封裝一些,能通過頁面的形式直接新增任務;
NLog
日誌記錄框架,方案中使用記錄日誌,後期可能回整合多個日誌框架;
Autofac
依賴注入的框架,應該不用做過多介紹;
SqlSugar
ORM框架,這個從剛開始我就在使用了,在現在公司沒有推行起來,不過在上兩家公司都留下的遺產,據說還用的可以,當然我還是最佩服作者;
Polly
容錯服務框架,類似於Java下的Hystrix,主要是為了解決分散式系統中,系統之間相互依賴,可能會因為多種因素導致服務不可用的而產生的一套框架,支援服務的超時重試、限流、熔斷器等等;
RabbitMQ.Client
官方提供的C#連線RabbitMQ的SDK;
三、方案
模擬一個簡單訂單下單的場景,沒有進行具體的實現。同時建議下游服務不要寫在web端,最好以服務的形式奔跑,程式碼中是Web端實現的,大家不要這麼搞。整體上還是實現了之前提到的兩種方案:一是入庫打標,二是延時佇列(這塊沒有進行很好的測試,但是估計也沒有很大的問題);當然也是有一些特點:RabbitMQ宕機情況下無需重啟服務,網路異常的情況下也可以進行斷線重連。接下來聊下程式碼和各方外掛在系統中的具體應用:
專案結構:
RabbitMQExtensions:
採用Autofac按照單例的形式注入,採用Polly進行斷線重連,也開啟了自身斷線重連和心跳檢測機制,配置方面採用最簡單的URI規範進行配置,有興趣參考下官方,整體上這塊程式碼還相對比較規範,以後可能也不會有太多調整;
1 /// <summary> 2 /// rabbitmq持久化連線 3 /// </summary> 4 public interface IRabbitMQPersistentConnection 5 { 6 bool IsConnected { get; } 7 8 bool TryConnect(); 9 10 IModel CreateModel(); 11 } 12 /// <summary> 13 /// rabbitmq持久化連線實現 14 /// </summary> 15 public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection 16 { 17 private readonly IConnectionFactory connectionFactory; 18 private readonly ILogger<DefaultRabbitMQPersistentConnection> logger; 19 20 private IConnection connection; 21 22 private const int RETTRYCOUNT = 6; 23 24 private static readonly object lockObj = new object(); 25 public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger) 26 { 27 this.connectionFactory = connectionFactory; 28 this.logger = logger; 29 } 30 31 public bool IsConnected 32 { 33 get 34 { 35 return connection != null && connection.IsOpen; 36 } 37 } 38 39 public void Cleanup() 40 { 41 try 42 { 43 connection.Dispose(); 44 connection.Close(); 45 connection = null; 46 47 } 48 catch (IOException ex) 49 { 50 logger.LogCritical(ex.ToString()); 51 } 52 } 53 54 public IModel CreateModel() 55 { 56 if (!IsConnected) 57 { 58 connection.Close(); 59 throw new InvalidOperationException("連線不到rabbitmq"); 60 } 61 return connection.CreateModel(); 62 } 63 64 public bool TryConnect() 65 { 66 logger.LogInformation("RabbitMQ客戶端嘗試連線"); 67 68 lock (lockObj) 69 { 70 if (connection == null) 71 { 72 var policy = RetryPolicy.Handle<SocketException>() 73 .Or<BrokerUnreachableException>() 74 .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => 75 { 76 logger.LogWarning(ex.ToString()); 77 }); 78 79 policy.Execute(() => 80 { 81 connection = connectionFactory.CreateConnection(); 82 }); 83 } 84 85 86 87 if (IsConnected) 88 { 89 connection.ConnectionShutdown += OnConnectionShutdown; 90 connection.CallbackException += OnCallbackException; 91 connection.ConnectionBlocked += OnConnectionBlocked; 92 93 logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}獲取了連線"); 94 95 return true; 96 } 97 else 98 { 99 logger.LogCritical("無法建立和開啟RabbitMQ連線"); 100 101 return false; 102 } 103 } 104 } 105 106 107 private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) 108 { 109 110 logger.LogWarning("RabbitMQ連線異常,嘗試重連..."); 111 112 Cleanup(); 113 TryConnect(); 114 } 115 116 private void OnCallbackException(object sender, CallbackExceptionEventArgs e) 117 { 118 119 logger.LogWarning("RabbitMQ連線異常,嘗試重連..."); 120 121 Cleanup(); 122 TryConnect(); 123 } 124 125 private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) 126 { 127 128 logger.LogWarning("RabbitMQ連線異常,嘗試重連..."); 129 130 Cleanup(); 131 TryConnect(); 132 } 133 }
OrderDal
SqlSugar的一些簡單封裝,有些小特點:大家可以可以通過配置來實現讀寫分離,採用倉儲設計。如果不太喜歡這麼寫,也可以參考傑哥的做法
public interface IBaseDal<T> where T:class,new() { DbSqlSugarClient DbContext { get; } IBaseDal<T> UserDb(string dbName); IInsertable<T> AsInsertable(T t); IInsertable<T> AsInsertable(T[] t); IInsertable<T> AsInsertable(List<T> t); IUpdateable<T> AsUpdateable(T t); IUpdateable<T> AsUpdateable(T[] t); IUpdateable<T> AsUpdateable(List<T> t); IDeleteable<T> AsDeleteable(); List<T> GetList(); Task<List<T>> GetListAnsync(); List<T> GetList(Expression<Func<T,bool>> whereExpression); Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression); List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc); Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc); List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page); Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page); List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc); Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc); int Count(Expression<Func<T, bool>> whereExpression); Task<int> CountAsync(Expression<Func<T, bool>> whereExpression); T GetById(dynamic id); T GetSingle(Expression<Func<T, bool>> whereExpression); Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression); T GetFirst(Expression<Func<T, bool>> whereExpression); Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression); bool IsAny(Expression<Func<T, bool>> whereExpression); Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression); bool Insert(T t); Task<bool> InsertAsync(T t); bool InsertRange(List<T> t); Task<bool> InsertRangeAsync(List<T> t); bool InsertRange(T[] t); Task<bool> InsertRangeAsync(T[] t); int InsertReturnIdentity(T t); Task<long> InsertReturnIdentityAsync(T t); bool Delete(Expression<Func<T, bool>> whereExpression); Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression); bool Delete(T t); Task<bool> DeleteAsync(T t); bool DeleteById(dynamic id); Task<bool> DeleteByIdAsync(dynamic id); bool DeleteByIds(dynamic[] ids); Task<bool> DeleteByIdsAsync(dynamic[] ids); bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression); Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression); bool Update(T t); Task<bool> UpdateAsync(T t); bool UpdateRange(T[] t); Task<bool> UpdateRangeAsync(T[] t); void BeginTran(); void CommitTran(); void RollbackTran(); } public class BaseDal<T> : IBaseDal<T> where T : class, new() { private readonly IEnumerable<DbSqlSugarClient> clients; public BaseDal(IEnumerable<DbSqlSugarClient> clients) { this.clients = clients; DbContext = this.clients.FirstOrDefault(x => x.Default); } public DbSqlSugarClient DbContext { get; set; } public IDeleteable<T> AsDeleteable() { return DbContext.Deleteable<T>(); } public IInsertable<T> AsInsertable(T t) { return DbContext.Insertable<T>(t); } public IInsertable<T> AsInsertable(T[] t) { return DbContext.Insertable<T>(t); } public IInsertable<T> AsInsertable(List<T> t) { return DbContext.Insertable<T>(t); } public IUpdateable<T> AsUpdateable(T t) { return DbContext.Updateable<T>(t); } public IUpdateable<T> AsUpdateable(T[] t) { return DbContext.Updateable<T>(t); } public IUpdateable<T> AsUpdateable(List<T> t) { return DbContext.Updateable(t); } public void BeginTran() { DbContext.Ado.BeginTran(); } public void CommitTran() { DbContext.Ado.CommitTran(); } public int Count(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Count(whereExpression); } public Task<int> CountAsync(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().CountAsync(whereExpression); } public bool Delete(Expression<Func<T, bool>> whereExpression) { return DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommand() > 0; } public bool Delete(T t) { return DbContext.Deleteable<T>().ExecuteCommand() > 0; } public async Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommandAsync() > 0; } public async Task<bool> DeleteAsync(T t) { return await DbContext.Deleteable(t).ExecuteCommandAsync() > 0; } public bool DeleteById(dynamic id) { return DbContext.Deleteable<T>().In(id).ExecuteCommand() > 0; } public async Task<bool> DeleteByIdAsync(dynamic id) { return await DbContext.Deleteable<T>().In(id).ExecuteCommandAsync() > 0; } public bool DeleteByIds(dynamic[] ids) { return DbContext.Deleteable<T>().In(ids).ExecuteCommand() > 0; } public async Task<bool> DeleteByIdsAsync(dynamic[] ids) { return await DbContext.Deleteable<T>().In(ids).ExecuteCommandAsync() > 0; } public T GetById(dynamic id) { return DbContext.Queryable<T>().InSingle(id); } public T GetFirst(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().First(whereExpression); } public async Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().FirstAsync(whereExpression); } public List<T> GetList() { return DbContext.Queryable<T>().ToList(); } public List<T> GetList(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Where(whereExpression).ToList(); } public List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc) { return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToList(); } public async Task<List<T>> GetListAnsync() { return await DbContext.Queryable<T>().ToListAsync(); } public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().Where(whereExpression).ToListAsync(); } public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc) { return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToListAsync(); } public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page) { return DbContext.Queryable<T>().Where(whereExpression).ToPageList(page.PageIndex,page.PageSize); } public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc) { return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageList(page.PageIndex, page.PageSize); } public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page) { return await DbContext.Queryable<T>().Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize); } public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc) { return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize); } public T GetSingle(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Single(whereExpression); } public async Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().SingleAsync(whereExpression); } public bool Insert(T t) { return DbContext.Insertable(t).ExecuteCommand() > 0; } public async Task<bool> InsertAsync(T t) { return await DbContext.Insertable(t).ExecuteCommandAsync() > 0; } public bool InsertRange(List<T> t) { return DbContext.Insertable(t).ExecuteCommand() > 0; } public bool InsertRange(T[] t) { return DbContext.Insertable(t).ExecuteCommand() > 0; } public async Task<bool> InsertRangeAsync(List<T> t) { return await DbContext.Insertable(t).ExecuteCommandAsync() > 0; } public async Task<bool> InsertRangeAsync(T[] t) { return await DbContext.Insertable(t).ExecuteCommandAsync() > 0; } public int InsertReturnIdentity(T t) { return DbContext.Insertable(t).ExecuteReturnIdentity(); } public async Task<long> InsertReturnIdentityAsync(T t) { return await DbContext.Insertable(t).ExecuteReturnBigIdentityAsync(); } public bool IsAny(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Any(whereExpression); } public async Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().AnyAsync(whereExpression); } public void RollbackTran() { DbContext.Ado.RollbackTran(); } public bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression) { return DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommand() > 0; } public bool Update(T t) { return DbContext.Updateable(t).ExecuteCommand() > 0; } public async Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression) { return await DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommandAsync() > 0; } public async Task<bool> UpdateAsync(T t) { return await DbContext.Updateable(t).ExecuteCommandAsync() > 0; } public bool UpdateRange(T[] t) { return DbContext.Updateable(t).ExecuteCommand() > 0; } public async Task<bool> UpdateRangeAsync(T[] t) { return await DbContext.Updateable(t).ExecuteCommandAsync() > 0; } public IBaseDal<T> UserDb(string dbName) { DbContext = this.clients.FirstOrDefault(it => it.DbName == dbName); return this; } }View Code
OrderCommon
定義全域性異常的中介軟體,還有包含一些用到的實體等等,這部分程式碼還可優化拆分一下;
OrderService
生產者和消費者的具體實現,這塊我還想在改造一番,將消費和業務分割開,現在寫的很凌亂,不建議這麼寫,先把程式碼放出來,看看大家贊同不贊同我的這些用法,可以討論,也歡迎爭論,雖然這塊程式碼寫的不好,但是其實裡面涉及一些RabbitMQ回撥函式的用法,也是比較重要的,沒有這些函式也就實現不了我上面說那兩個特點;
//RabbitMQ宕機以後回撥 //客戶端這塊大家不要採用遞迴呼叫恢復連結 //具體為什麼大家可以測試下,這裡留點小疑問哈哈 connection.ConnectionShutdown += OnConnectionShutdown; //消費端異常以後回撥 consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException;
Order
具體的呼叫者,大家應該根據方法名字就能區分出我上面提到的兩種方案的設計;
HangfireExtensions
Hangfire定時框架,採用Mysql作為持久層的儲存,寫的也比較清晰,後期就是針對這些進行擴充套件,實現在介面就能新增定時任務;
四、結束
生產端和消費端這段程式碼寫的凌亂,希望大家不要介意這一點,是有原因的,這裡我就不說了。希望大家看到閃光點,不要在一點上糾結;下次會加入Elasticsearch和監控部分的時候我會把這塊程式碼改掉,還大家一片整潔的世界;
Github地址:https://github.com/wangtongzhou520/rabbitmq.git 有什麼問題大家可以問我;
歡迎大家加群438836709!歡迎大家關注我!
&n