1. 程式人生 > >有關多執行緒同步資料

有關多執行緒同步資料

 近期由於同步一批資料,資料數量三百多萬,一般的批量插入也太慢了,並且需要同時做更新和插入的操作,如果一般的做法先遍歷匹配,在分別進行更新和插入,則效率太低,後通過大神指點和上網查資料,終於使用merge into 語法完成了同步,雖然還是太慢,但是,也總是學到了些新的東西,現在我把思路和部分程式碼記下來,主要是便於自己去記憶,當然也希望對大家有所幫助:

資料總數: totalCount
單頁面資料大小: numPage
總頁數:totalPage ------- (int)Math.ceil((totalCount/numPage*1.0));
已插入資料庫的頁數: beenHandledPage   ----初始化為0
設定一個序列號 String uuid = UUID.randomUUID().toString();
1,設定執行緒總數為   threadNum     ----  5

2,設定單執行緒處理的頁面數
int pageNumOfPerThread = (int)Math.ceil((totalPage-beenHandledPage)/(threadNum*1.0));

3、遍歷每個執行緒
        設定開始頁數:int stattPage = beenHandledPage + i*pageNumOfPerThread +1;
        
        設定結束頁數:int  endPage = beenHandledPage + (i+1)*pageNumOfPerThread;
        
        建立執行緒處理:MarketingAsstsTransitionThread thread = new MarketingAsstsTransitionThread(startPage,
                                                               endPage<totalPage?endPage:totalPage,numPage,
                                                               maxTs,totalCount,uuid,jdbcTemplate,
                                                               marketingAssetsMapper, amDataSyncLogService);
        (//註釋  jdbcTemplate, marketingAssetsMapper, amDataSyncLogService  處理業務邏輯需要的些介面名,jdbcTemplate是通過配置檔案可以實現連線遠端資料庫,並編寫sql語句

         maxTs   是本地測試庫中所有資料中時間值最大的一個時間值)

     
        
        開始啟動執行緒  :  thread.start();
        

4,下面是具體的執行緒類的內容
        public class MarketingAssetsTransitionThread extends Thread{

    //例項化引數
    private int startPage;
    private int endPage;
    private int numPage;
    private String maxTs;
    private Integer count;
    private String uuid;
    private JdbcTemplate jdbcTemplate;
    private MarketingAssetsMapper marketingAssetsMapper;
    private AmDataSyncLogService amDataSyncLogService;


    /**
     * 例項化執行緒及注入bean
     */
    public MarketingAssetsTransitionThread(int startPage, int endPage, int numPage, String maxTs, 
                                           Integer count, String uuid, JdbcTemplate jdbcTemplate,
                                           MarketingAssetsMapper  marketingAssetsMapper,
                                           AmDataSyncLogService amDataSyncLogService) {
        super();
        this.startPage = startPage;
        this.endPage = endPage;
        this.numPage = numPage;
        this.maxTs = maxTs;
        this.count = count;
        this.uuid = uuid;
        this.jdbcTemplate = jdbcTemplate;
        this.marketingAssetsMapper = marketingAssetsMapper;
        this.amDataSyncLogService = amDataSyncLogService;
    }

    public void run(){
    
    //具體的業務邏輯
    /**
    *從遠端資料庫中與本地資料庫中最大的一個時間值進行匹對,並取出這些資料的集合,遍歷該集合並於本地資料
    *庫的所有資料進行匹配(通過唯一標識)如果在本地資料庫中存在與該資料
    *唯一標識相同的資料則對本地資料庫的與之匹配的資料進行更新,如果不存在則將該資料新增到本地資料庫中
    */
    
    /**
    *在上面的業務處理中,通過條件從遠端資料庫與本地資料庫匹配並同時進行更新和插入操作的時候,使用的
    *是(Oracle 9i引入的)提供的merge關鍵字,merge into 語法,進行同時的更新和插入操作
    *(merge語法是根據源表對目標表進行匹配查詢,匹配成功時更新,不成功時插入)
    *
    */
    }
    
5,一部分的mapper檔案內容:
      <insert id="mergeMarketingAssetsList" parameterType="java.util.List">
        MERGE INTO MARKETING_ASSETS M
        USING
        (
        <foreach collection="list" item="item" index="index" separator="UNION">
        SELECT
                #{item.areaname ,jdbcType=VARCHAR}  AREANAME,
                #{item.areaid ,jdbcType=VARCHAR} AREAID,
                #{item.yname ,jdbcType=VARCHAR} YNAME,
                #{item.bldname ,jdbcType=VARCHAR} BLDNAME,
                #{item.roomid ,jdbcType=VARCHAR} ROOMID,
                #{item.bldid ,jdbcType=VARCHAR} BLDID,
                #{item.unit ,jdbcType=VARCHAR} UNIT,
                #{item.stair ,jdbcType=VARCHAR} STAIR,
                #{item.jgstair ,jdbcType=VARCHAR} JGSTAIR,
                #{item.roomno ,jdbcType=VARCHAR} ROOMNO
        FROM DUAL
        </foreach>
        ) T
        ON (M.ROOMID = T.ROOMID and M.IS_DEL = T.IS_DEL)
        WHEN MATCHED THEN
        UPDATE SET
            M.AREANAME=T.AREANAME,
            M.AREAID=T.AREAID,
            M.YNAME=T.YNAME,
            M.BLDNAME=T.BLDNAME,
            M.BLDID=T.BLDID,
            M.UNIT=T.UNIT,
            M.STAIR=T.STAIR,
            M.JGSTAIR=T.JGSTAIR,
            M.ROOMNO=T.ROOMNO
        WHEN NOT MATCHED THEN
        INSERT (
            AREANAME,
            AREAID,
            YNAME,
            BLDNAME,
            ROOMID,
            BLDID,
            UNIT,
            STAIR,
            JGSTAIR,
            ROOMNO
            ) VALUES (
            T.AREANAME,
            T.AREAID,
            T.YNAME,
            T.BLDNAME,
            T.ROOMID,
            T.BLDID,
            T.UNIT,
            T.STAIR,
            T.JGSTAIR,
            T.ROOMNO
        )
    </insert>
    
    ============================================
    下面是具體的一些merge into 用法:

其基本語法規則是

merge into 目標表 a

using 源表 b

on(a.條件欄位1=b.條件欄位1 and a.條件欄位2=b.條件欄位2 ……)  

when matched then update set a.更新欄位=b.欄位

when  not macthed then insert into a(欄位1,欄位2……)values(值1,值2……)

變種寫法①,只更新:

merge into 目標表 a

using 源表 b

on(a.條件欄位1=b.條件欄位1 and a.條件欄位2=b.條件欄位2 ……)  

when matched then update set a.更新欄位=b.欄位,a.更新欄位2=b.欄位2……

變種寫法②,只插入:

merge into 目標表 a

using 源表 b

on(a.條件欄位1=b.條件欄位1 and a.條件欄位2=b.條件欄位2 ……)  

when  not macthed then insert into a(欄位1,欄位2……)values(值1,值2……)

注:條件欄位不可更新

對於Oracle來說,merge是9i新增的語法,在10g進行了一些增強,如下:

測試環境:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0

①條件操作:

merge into 目標表 a

using 源表 b

on(a.條件欄位1=b.條件欄位1 and a.條件欄位2=b.條件欄位2 ……)  

when matched then update set a.更新欄位=b.欄位  where 限制條件

when  not macthed then insert into a(欄位1,欄位2……)values(值1,值2……) where 限制條件

舉例:

merge into test_merge a
using test b
on(a.no=b.no)
when matched then update set a.no2=b.no2 where a.no<>1
when not matched then insert values(b.no,b.no2)  where a.no<>100

當然也支援變種①②的寫法

②刪除操作

merge into 目標表 a

using 源表 b

on(a.條件欄位1=b.條件欄位1 and a.條件欄位2=b.條件欄位2 ……)  

when matched then update set a.更新欄位=b.欄位

delete where b.欄位=xxx

舉例:

merge into test_merge a
using test b
on(a.no=b.no)
when matched then update set a.no2=b.no2 where a.no<>1
delete 
where b.no=14

備註:刪除動作針對的也是目標表,並且必須在語句最後

基本上merge的用法就是以上這些,建議平常可以多用,
比單獨的update+insert的方式效率要更高,尤其是on條件下有唯一索引的時候,效率更高