1. 程式人生 > >分布式事務之消息補償解決方案

分布式事務之消息補償解決方案

訂閱 rgs font 示意圖 table client lba subscribe 這也

一、數據庫本地事務

先看看數據庫事務的定義:單個邏輯工作單元執行的一系列操作,要麽完全地執行,要麽完全地不執行

這個比較容易理解,操作過數據庫的一般都懂,既是業務需求涉及到多個數據表操作的時候,需要用到事務

要麽一起更新,要麽一起不更新,不會出現只更新了部分數據表的情況,下邊看看數據庫事務的使用

1 begin tran
2     begin try 
3         update Table1 set Field = 1 where ID = 1
4         update Table2 set Field = 2 where ID = 1
5     end
try 6 begin catch 7 rollback tran 8 end catch 9 commit tran

上實例在小型項目中一般是問題不大的,因為小型項目一般是單機系統,數據庫、Web服務大都在一臺服務器上,甚至可能只有一個數據庫文件,

這種情況下使用本地事務沒有一點問題;

但是本地事務有很大的缺陷,因為開啟事務一般是鎖表的,事務執行期間會一直鎖著,其他的操作一般都要排隊等待,對性能要求比較高的系統是不能忍受的。

特別是涉及改動不同數據庫的操作,這會造成跨庫事務,性能更加低

如果還涉及到不在同一臺服務器、甚至不同網段部署的數據庫,那本地事務簡直是系統運行的災難,是首先需要丟棄的解決方案。

技術分享圖片

那如果遇到上述情況,該怎麽做呢,這就涉及到分布式事務了

二、分段式事務的補償機制

如果有海量數據需要處理、或者要求高並發請求的話,同步的事務機制已經是不現實的了,這種情況下必須采用異步事務機制,既分段式的事務

分段式事務一般做法就是把需求任務分段式地完成,通過事務補償機制來保證業務最終執行成功,補償機制一般可以歸類為2種:

1 )定時任務補償:

  通過定時任務去跟進後續任務,根據不同的狀態表確定下一步的操作,從而保證業務最終執行成功,

  這種辦法可能會涉及到很多的後臺服務,維護起來也會比較麻煩,這是應該是早期比較流行的做法

2) 消息補償:

  通過消息中間件觸發下一段任務,既通過實時消息通知下一段任務開始執行,執行完畢後的消息回發通知來保證業務最終完成;

  當然這也是異步進行的,但是能保證數據最終的完整性、一致性,也是近幾年比較熱門的做法

定時任務補償就不說了,這篇文章我們來討論一下通過消息補償來完成分布式事務的一般做法

三、分布式事務之消息補償

0)我們以簡單的產品下單場景來說明,(不要較真哈)

1)先來看看分布式異步事務處理流程示意圖,APP1與APP2需要互相訂閱對方消息

技術分享圖片

2)首先看數據庫,2個,一個庫存庫,一個已下單成功的庫

技術分享圖片

 1 -- 下單通知,主要作用保留已下單操作,消息發送失敗可以根據此表重新發送
 2 CREATE TABLE [dbo].[ProductMessage](
 3     [ID] [int] IDENTITY(1,1) NOT NULL,
 4     [Product] [varchar](50) NULL,
 5     [Amount] [int] NULL,
 6     [UpdateTime] [datetime] NULL
 7 ) 
 8 -- 庫存
 9 CREATE TABLE [dbo].[ProductStock](
10     [ID] [int] IDENTITY(1,1) NOT NULL,
11     [Product] [varchar](50) NULL,
12     [Amount] [int] NULL
13 )
14 -- 下單成功
15 CREATE TABLE [dbo].[ProductSell](
16     [ID] [int] IDENTITY(1,1) NOT NULL,
17     [Product] [varchar](50) NULL,
18     [Customer] [int] NULL,
19     [Amount] [int] NULL
20 )
21 -- 下單成功消息,主要作用防止重復消費
22 CREATE TABLE [dbo].[ProductMessageApply](
23     [ID] [int] IDENTITY(1,1) NOT NULL,
24     [MesageID] [int] NULL,
25     [CreateTime] [datetime] NULL
26 )

3)項目架構Demo

技術分享圖片

數據底層訪問使用的是Dapper、使用redis作為消息中間件

4)實體層代碼

 1     public class ProductMessage
 2     {
 3         [Key]
 4         [IgnoreProperty(true)]
 5         public int ID { get; set; }
 6         public string Product { get; set; }
 7         public int Amount { get; set; }
 8         public DateTime UpdateTime { get; set; }
 9     }
10     public class ProductMessageApply
11     {
12         [Key]
13         [IgnoreProperty(true)]
14         public int ID { get; set; }
15         public int MesageID { get; set; }
16         public DateTime CreateTime { get; set; }
17     }
18     public class ProductSell
19     {
20         [Key]
21         [IgnoreProperty(true)]
22         public int ID { get; set; }
23         public string Product { get; set; }
24         public int Customer { get; set; }
25         public int Amount { get; set; }
26     }
27     public class ProductStock
28     {
29         [Key]
30         [IgnoreProperty(true)]
31         public int ID { get; set; }
32         public string Product { get; set; }
33         public int Amount { get; set; }
34     }

5)服務接口層代碼

 1     public interface IProductMessageApplyService
 2     {
 3         void Add(ProductMessageApply entity);
 4         ProductMessageApply Get(int id);
 5     }
 6     public interface IProductMessageService
 7     {
 8         void Add(ProductMessage entity);
 9         IEnumerable<ProductMessage> Gets(object paramPairs = null);
10         void Delete(int id);
11     }
12     public interface IProductSellService
13     {
14         void Add(ProductSell entity);
15     }
16     public interface IProductStockService
17     {
18         void ReduceReserve(int id, int amount);
19     }

6)庫存、消息通知

 1     public class ProductMessageService : IProductMessageService
 2     {
 3         private IRepository<ProductMessage> repository;
 4 
 5         public ProductMessageService(IRepository<ProductMessage> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public void Add(ProductMessage entity)
11         {
12             this.repository.Add(entity);
13         }
14 
15         public IEnumerable<ProductMessage> Gets(object paramPairs = null)
16         {
17             return this.repository.Gets(paramPairs);
18         }
19 
20         public void Delete(int id)
21         {
22             this.repository.Delete(id);
23         }
24     }
25 
26     public class ProductStockService : IProductStockService
27     {
28         private IRepository<ProductStock> repository;
29 
30         public ProductStockService(IRepository<ProductStock> repository)
31         {
32             this.repository = repository;
33         }
34 
35         public void ReduceReserve(int id, int amount)
36         {
37             var entity = this.repository.Get(id);
38             if (entity == null) return;
39 
40             entity.Amount = entity.Amount - amount;
41             this.repository.Update(entity);
42         }
43     }

7)下單、下單成功消息

 1     public class ProductMessageApplyService : IProductMessageApplyService
 2     {
 3         private IRepository<ProductMessageApply> repository;
 4 
 5         public ProductMessageApplyService(IRepository<ProductMessageApply> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public void Add(ProductMessageApply entity)
11         {
12             this.repository.Add(entity);
13         }
14 
15         public ProductMessageApply Get(int id)
16         {
17             return this.repository.Get(id);
18         }
19     }
20 
21     public class ProductSellService : IProductSellService
22     {
23         private IRepository<ProductSell> repository;
24 
25         public ProductSellService(IRepository<ProductSell> repository)
26         {
27             this.repository = repository;
28         }
29 
30         public void Add(ProductSell entity)
31         {
32             this.repository.Add(entity);
33         }
34     }

8)下單減庫存測試

 1 namespace Demo.Reserve.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Console.WriteLine(string.Format("{0} 程序已啟動", DateTime.Now.ToString()));
 8 
 9             Send();
10             Subscribe();
11            
12             Console.ReadKey();
13         }
14 
15         private static void Send()
16         {
17             var unitOfWork = new UnitOfWork(Enums.Reserve);
18 
19             try
20             {
21                 var productStockRepository = new BaseRepository<ProductStock>(unitOfWork);
22                 var productStockServic = new ProductStockService(productStockRepository);
23                 var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
24                 var productMessageService = new ProductMessageService(productMessageRepository);
25 
26                 var id = 1;
27                 var amount = 2;
28                 var productMessage = new ProductMessage()
29                 {
30                     Product = "ProductCode",
31                     Amount = amount,
32                     UpdateTime = DateTime.Now
33                 };
34 
35                 productStockServic.ReduceReserve(id, amount);
36                 productMessageService.Add(productMessage);
37                 unitOfWork.Commit();
38                 Console.WriteLine(string.Format("{0} 減庫存完成", DateTime.Now.ToString()));
39                 Thread.Sleep(1000);
40 
41                 var message = JsonConvert.SerializeObject(productMessage);
42                 RedisConfig.Instrace.Publish("channel.Send", message);
43                 Console.WriteLine(string.Format("{0} 發送減庫存消息: {1}", DateTime.Now.ToString(), message));
44             }
45             catch (Exception ex)
46             {
47                 //Logger.Error(ex);
48                 unitOfWork.Rollback();
49             }
50         }
51 
52         private static void Subscribe()
53         {
54             var client = RedisConfig.Instrace.NewClient();
55             var subscriber = client.GetSubscriber();
56 
57             subscriber.Subscribe("channel.Success", (chl, message) =>
58             {
59                 try
60                 {
61                     var unitOfWork = new UnitOfWork(Enums.Reserve);
62                     var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
63                     var productMessageService = new ProductMessageService(productMessageRepository);
64 
65                     var messageID = message.ToString().ToInt();
66                     if (messageID > 0)
67                     {
68                         productMessageService.Delete(messageID);
69                         Console.WriteLine(string.Format("{0} 收到消費成功消息:{1}", DateTime.Now.ToString(), message));
70                     }
71                 }
72                 catch (Exception ex)
73                 {
74                     //Logger.Error(ex);
75                 }
76             });
77         }
78     }
79 }

9)下單成功及消息回發測試

 1 namespace Demo.Sell.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Subscribe();
 8 
 9             Console.WriteLine(string.Format("{0} 程序已啟動", DateTime.Now.ToString()));
10             Console.ReadKey();
11         }
12 
13         private static void Subscribe()
14         {
15             var client = RedisConfig.Instrace.NewClient();
16             var subscriber = client.GetSubscriber();
17 
18             subscriber.Subscribe("channel.Send", (chl, message) =>
19             {
20                 Consume(message);
21             });
22         }
23 
24         private static void Consume(string message)
25         {
26             var unitOfWork = new UnitOfWork(Enums.Sell);
27 
28             try
29             {
30                 Console.WriteLine(string.Format("{0} 收到減庫存消息: {1}", DateTime.Now.ToString(), message));
31 
32                 var productMessage = JsonConvert.DeserializeObject<ProductMessage>(message);
33 
34                 var productSellRepository = new BaseRepository<ProductSell>(unitOfWork);
35                 var productSellService = new ProductSellService(productSellRepository);
36 
37                 var productMessageApplyRepository = new BaseRepository<ProductMessageApply>(unitOfWork);
38                 var productMessageApplyService = new ProductMessageApplyService(productMessageApplyRepository);
39 
40                 var noExists = productMessageApplyService.Get(productMessage.ID) == null;
41                 if (noExists)
42                 {
43                     productSellService.Add(new ProductSell()
44                     {
45                         Product = productMessage.Product,
46                         Amount = productMessage.Amount,
47                         Customer = 123
48                     });
49 
50                     productMessageApplyService.Add(new ProductMessageApply()
51                     {
52                         MesageID = productMessage.ID,
53                         CreateTime = DateTime.Now
54                     });
55 
56                     unitOfWork.Commit();
57                     Console.WriteLine(string.Format("{0} 消息消費完成", DateTime.Now.ToString()));
58                     Thread.Sleep(1000);
59                 }
60 
61                 RedisConfig.Instrace.Publish("channel.Success", productMessage.ID.ToString());
62                 Console.WriteLine(string.Format("{0} 發送消費完成通知:{1}", DateTime.Now.ToString(), productMessage.ID.ToString()));
63             }
64             catch (Exception ex)
65             {
66                 //Logger.Error(ex);
67                 unitOfWork.Rollback();
68             }
69         }
70     }
71 }

10)好了,到了最後檢驗成果的時候了

先打開Demo.Sell.App.exe、然後打開Demo.Reserve.App.exe

技術分享圖片

大功告成!

分布式事務之消息補償解決方案