1. 程式人生 > >將MySQL去重操作優化到極致之三彈連發(二):多執行緒並行執行

將MySQL去重操作優化到極致之三彈連發(二):多執行緒並行執行

        上一篇已經將單條查重語句調整到最優,但該語句是以單執行緒方式執行。能否利用多處理器,讓去重操作多執行緒並行執行,從而進一步提高速度呢?比如我的實驗環境是4處理器,如果使用4個執行緒同時執行查重sql,理論上應該接近4倍的效能提升。

一、資料分片
        我們生成測試資料時,created_time採用每條記錄加一秒的方式,也就是最大和在最小的時間差為50萬秒,而且資料均勻分佈。因此先把資料平均分成4份。

1. 查詢出4份資料的created_time邊界值

select date_add('2017-01-01',interval 125000 second) dt1,
       date_add('2017-01-01',interval 2*125000 second) dt2,
       date_add('2017-01-01',interval 3*125000 second) dt3,
       max(created_time) dt4
  from t_source;
        查詢結果如圖一所示。


圖一

2. 檢視每份資料的記錄數,確認資料平均分佈
select case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-01'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-03 21:26:40' 
            else '2017-01-05 08:10:00'
        end min_dt,
       case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-03 21:26:40'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-05 08:10:00'
            else '2017-01-06 18:53:20'
        end max_dt,
       count(*)
  from t_source
 group by case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-01'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-03 21:26:40' 
            else '2017-01-05 08:10:00'
        end,
       case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-03 21:26:40'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-05 08:10:00'
            else '2017-01-06 18:53:20'
        end;

        查詢結果如圖二所示。


圖二


        4份資料的並集應該覆蓋整個源資料集,並且資料之間是不重複的。也就是說4份資料的created_time要連續且互斥,連續保證處理全部資料,互斥確保了不需要二次查重。實際上這和時間範圍分割槽的概念類似,或許用分割槽表更好些,只是這裡省略了重建表的步驟。

3. 建立查重的儲存過程
        有了以上資訊我們就可以寫出4條語句處理全部資料。為了呼叫介面儘量簡單,建立下面的儲存過程。
delimiter //
create procedure sp_unique(i smallint)    
begin     
    set @a:='0000-00-00 00:00:00';  
    set @b:=' ';  
	if (i<4) then
        insert into t_target  
        select * from t_source force index (idx_sort)  
         where created_time >= date_add('2017-01-01',interval (i-1)*125000 second) 
           and created_time < date_add('2017-01-01',interval i*125000 second) 
           and (@a!=created_time or @b!=item_name) 
           and (@a:=created_time) is not null 
           and (@b:=item_name) is not null  
         order by created_time,item_name;  
        commit;
    else 
	insert into t_target  
        select * from t_source force index (idx_sort)  
         where created_time >= date_add('2017-01-01',interval (i-1)*125000 second) 
           and created_time <= date_add('2017-01-01',interval i*125000 second) 
           and (@a!=created_time or @b!=item_name) 
           and (@a:=created_time) is not null 
           and (@b:=item_name) is not null  
         order by created_time,item_name;  
        commit;
    end if;	
end     
// 

delimiter ; 

        查詢的執行計劃都如圖三所示。


圖三


        mysql優化器進行索引範圍掃描,並且使用索引條件下推(ICP)優化查詢。

二、並行執行
        下面分別使用shell後臺程序和MySQL Schedule Event實現並行。

1. shell後臺程序

(1)建立duplicate_removal.sh檔案,內容如下。
#!/bin/bash
mysql -vvv -u root -p123456 test -e "truncate t_target" &>/dev/null 
date '+%H:%M.%N'
for y in {1..4}
do
  sql="call sp_unique($y)"
  mysql -vvv -u root -p123456 test -e "$sql" &>par_sql1_$y.log &
done
wait
date '+%H:%M.%N'

(2)執行指令碼檔案

chmod 755 duplicate_removal.sh
./duplicate_removal.sh

        執行輸出入圖四所示。


圖四

這種方法用時3.4秒,並行執行的4個過程呼叫分別用時如圖五所示。


圖五

        可以看到,每個過程的執行時間均不到3.4秒,因為是並行執行,總的過程執行時間也小於3.4秒,比單執行緒sql速度提高了近3倍。

2. MySQL Schedule Event
        吳老師也用到了並行,但他是利用MySQL自帶的Schedule Event功能實現的,程式碼應該和下面的類似。

(1)建立事件歷史日誌表

-- 用於檢視事件執行時間等資訊
create table t_event_history  (  
   dbname  varchar(128) not null default '',  
   eventname  varchar(128) not null default '',  
   starttime  datetime(3) not null default '0000-00-00 00:00:00',  
   endtime  datetime(3) default null,  
   issuccess  int(11) default null,  
   duration  int(11) default null,  
   errormessage  varchar(512) default null,  
   randno  int(11) default null
);  

(2)修改event_scheduler引數

set global event_scheduler = 1;

(3)為每個併發執行緒建立一個事件

delimiter //
create event ev1 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作業名	
    values(database(),'ev1', v_starttime,v_randno);    
     
    begin  
        #異常處理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此處為實際呼叫的使用者程式過程  
        call sp_unique(1);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//     

create event ev2 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作業名	
    values(database(),'ev2', v_starttime,v_randno);    
     
    begin  
        #異常處理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此處為實際呼叫的使用者程式過程  
        call sp_unique(2);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//  

create event ev3 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作業名	
    values(database(),'ev3', v_starttime,v_randno);    
     
    begin  
        #異常處理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此處為實際呼叫的使用者程式過程  
        call sp_unique(3);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//  

create event ev4 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作業名	
    values(database(),'ev4', v_starttime,v_randno);    
     
    begin  
        #異常處理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此處為實際呼叫的使用者程式過程  
        call sp_unique(4);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//

delimiter ;   

        說明:為了記錄每個事件執行的時間,在事件定義中增加了操作日誌表的邏輯,因為每個事件中只多執行了一條insert,一條update,4個事件總共多執行8條很簡單的語句,對測試的影響可以忽略不計。執行時間精確到毫秒。

(4)觸發事件執行

mysql -vvv -u root -p123456 test -e "truncate t_target;alter event ev1 on schedule at current_timestamp enable;alter event ev2 on schedule at current_timestamp enable;alter event ev3 on schedule at current_timestamp enable;alter event ev4 on schedule at current_timestamp enable;"

        說明:該命令列順序觸發了4個事件,但不會等前一個執行完才執行下一個,而是立即向下執行。從圖六的輸出也可以清楚地看到這一點。因此四次過程呼叫是並行執行的。

圖六
(5)檢視事件執行日誌
select * from t_event_history;

        查詢結果如圖7所示。


圖七