1. 程式人生 > >多執行緒 取資料必須不重複的方案

多執行緒 取資料必須不重複的方案

最近一直在處理資料上傳和採集的問題, 因為寫在asp.net 裡面的web服務預設就是多執行緒的, 一個請求就是一個執行緒… 所以多執行緒之間為了不讀取重複的資料, 就成了問題.

資料必須嚴格不重複, 同樣的資料絕對不能處理2次…
多執行緒就更不能出現重複讀取的現象了.

自己現在也用的是另外一套非常蹩腳的方法, . 把資料取出來然後在記憶體裡面通過lock(object)的形式實現資料不重複處理的辦法. 但是這又牽扯到資料的取出和更新, 也比較麻煩. 雖然實現了, 但是後續的修改和變更邏輯極其複雜.

後來再百度上又看了一遍找到了我認為最完美的方法,其它的我感覺都不怎麼優美.
文章是下面這個.

找到了一種專門針對sqlserver的. 可以通過先更新同時通過deleted表(就像是在觸發器中使用一樣)取出的方式,來保證每條記錄只會被讀取一次。


declare @Rowid table(rowid int);
BEGIN
 set rowcount 100; --一次讀取的行數
 --先將要讀取的記錄狀態更新
 update Sms set [status]= 1  output deleted.ID into @Rowid Where [status] = 0;
 --讀取剛更新狀態的記錄
 select  * from Sms where ID in (select Rowid from
@Rowid); END

但是這種只是針對sqlserver的, 所以在這個的基礎上, 我設計改進了另外一種通用的方法.
同樣是發簡訊為例,
邏輯過程如下

1. 開啟事務, 保證update語句互不影響,
2. update top 100 Sms set status=@ThreadId where status = 0 ;
3. select  * from Sms where status = @ThreadId;
4. 提交事務

如果不想影響status的狀態, 可以改成

1. 開啟事務, 保證update語句互不影響,
2. update top 100 Sms set processer =
@ThreadId where status = 0 ; 3. select * from Sms where status = 0 and processer = @ThreadId; 4. 提交事務

這樣就可以在這個請求中, 確保取到的資料,沒有被其它執行緒取到. 因為每個執行緒的ThreadId肯定是不一樣的.

當然這個邏輯也可以升級一下把ThreadId 改成其它的某個有規則的能夠區分不同的任務的編號, 如果是分散式任務, 可以考慮前面再加個機器號.

或者把這個@ThreadId改成 orcale中的 sequence

如果是多執行緒只有一個程式在執行的話, 可以把這個數值通過單列模式在靜態變數裡面取資料.
每次任務執行前取一個 任務ID 當作@ThreadId.

我實際在用的程式碼

BBZQ表中加了3個欄位

欄位名 型別 說明
W_JOBID string 任務ID,主要用它來分割不同的任務
W_PROCESSTIME date 處理時間, 主要用在處理失敗的或者未處理的, 超時10分鐘後會強行再次被獲取
ISUPLOAD int 是否已上傳,上傳成功後會更新此欄位
    private static int _JobId = 0;
    public string GetJobId()
    {
        lock (SynCacheObject)
        {
            _JobId = _JobId + 1;
            return "JOB"+DateTime.Now.ToString("yyyyMMddHHmmssfff_")+_JobId;
        }
    }
public List<DataChangeLog> GetCHGAndWSWJobs()
        {
            //ReturnLogs rlog = new  ReturnLogs();
             
                var jobid = GetJobId(); //這裡通過lock 鎖定取得唯一的編號
                //AND(W_JOBID is null OR  W_PROCESSTIME < SYSDATE - 10 / 1440)-- 超時10分鐘處理失敗的或者未處理的的也會強行再次被獲取,因為處理的部分有防止重複執行的功能, 所以可以重複執行
                //order by EXECUTEDATE asc   如果不按照事件發生時間排序,反稽核之後又稽核的的資料可能會被刪掉
                //with temptable as (**) 臨時表的寫法是因為orcale 在 update 語句中 子查詢不支援orderby  所以用了臨時表.
                var sql = @"update BBZQ set W_JOBID = :JobId, W_PROCESSTIME=SYSDATE where JGID=2 AND XH in (
                                with temptable as(
				                        select XH
				                        from BBZQ 
				                        where ROWNUM < 200
				                        AND ( W_JOBID is null OR  W_PROCESSTIME < SYSDATE - 10/1440 )
				                        AND ISUPLOAD = 0 
				                        AND EXECUTEDATE > SYSDATE - 30 
				                        AND STATUS in(15, 16)   
				                        AND  JGID=2
				                        order by EXECUTEDATE asc
		                        ) 
                                select XH from temptable
                            )";

                var i =  DB.Execute(sql, new { JobId = jobid });
                if (i==0) //如果沒有更新到資料也就直接返回了, 無需再次查詢
                {
                    return null;
                }
                sql = @" SELECT *
                        FROM  BBZQ
                        WHERE W_JOBID = :JobId
                        ORDER BY EXECUTEDATE asc";
                //   throw new Exception(sql);
                return   DB.Query<DataChangeLog>(sql, new { JobId = jobid }).ToList();
                 
        }