1. 程式人生 > >MySql 快速插入千萬級大資料

MySql 快速插入千萬級大資料

原文地址:http://blog.csdn.net/oldbai001/article/details/51693139

在資料分析領域,資料庫是我們的好幫手。不僅可以接受我們的查詢時間,還可以在這基礎上做進一步分析。所以,我們必然要在資料庫插入資料。在實際應用中,我們經常遇到千萬級,甚至更大的資料量。如果沒有一個快速的插入方法,則會事倍功半,花費大量的時間。

在參加阿里的天池大資料演算法競賽中(流行音樂趨勢預測),我遇到了這樣的問題,在沒有優化資料庫查詢及插入之前,我花了不少冤枉時間,沒有優化之前,1500萬條資料,光插入操作就花費了不可思議的12個小時以上(使用最基本的逐條插入)。這也促使我思考怎樣優化資料庫插入及查詢操作,提高效率。

在不斷優化過程中,效能有大幅提升。在按時間序列從資料庫查詢並彙總生成2萬6000多首歌曲的下載,播放,收藏數過程中,通過查詢生成的操作速度提高從預估的40多小時降低到一小時多。在資料庫插入方面,效能得到大幅提升;在新的資料集上測試,5490萬+的資料,20分鐘完成了插入。下面分享一下我的心得。

優化過程分為2步。第一步,實驗靜態reader從CSV檔案讀取資料,達到一定量時,開始多執行緒插入資料庫程式;第二步,使用mysq批量插入操作。

第一步,讀取檔案,開始插入多執行緒

在這裡,達到一定量的量是個需要斟酌的問題,在我的實驗中,開始使用100w作為這個量,但是出現了新的問題,Java 堆記憶體溢位,最終採用了10W作為量的標準。

當然,可以有其他的量,看大家自己喜歡那個了。

  1. mport java.io.BufferedReader;  
  2. import java.io.FileNotFoundException;  
  3. import java.io.FileReader;  
  4. import java.io.IOException;  
  5. import java.util.ArrayList;  
  6. import java.util.List;  
  7. import preprocess.ImportDataBase;  
  8. publicclass MuiltThreadImportDB {  
  9.     /** 
  10.      * Java多執行緒讀大檔案併入庫
     
  11.      *  
  12.      * @param args 
  13.      */
  14.     privatestaticint m_record = 99999;  
  15.     privatestatic BufferedReader br = null;  
  16.     private ArrayList<String> list;  
  17.     privatestaticint m_thread = 0;  
  18.     static {  
  19.     try {  
  20.         br = new BufferedReader(  
  21.             new FileReader(  
  22.                 "E:/tianci/IJCAI15 Data/data_format1/user_log_format1.csv"),8192);  
  23.     } catch (FileNotFoundException e) {  
  24.         e.printStackTrace();  
  25.     }  
  26.     try {  
  27.         br.readLine(); // 去掉CSV Header
  28.     } catch (IOException e) {  
  29.         e.printStackTrace();  
  30.     }  
  31.     }  
  32.     publicvoid start() {  
  33.     String line;  
  34.     int count = 0;  
  35.     list = new ArrayList<String>(m_record + 1);  
  36.     synchronized (br) {  
  37.         try {  
  38.         while ((line = br.readLine()) != null) {  
  39.             if (count < m_record) {  
  40.             list.add(line);  
  41.             count++;  
  42.             } else {  
  43.             list.add(line);  
  44.             count = 0;  
  45.             Thread t1 = new Thread(new MultiThread(list),Integer.toString(m_thread++));  
  46.             t1.start();  
  47.             list = new ArrayList<String>(m_record + 1);  
  48.             }  
  49.         }  
  50.         if (list != null) {  
  51.             Thread t1 = new Thread(new MultiThread(list),Integer.toString(m_thread++));  
  52.             t1.start();  
  53.         }  
  54.         } catch (IOException e) {  
  55.         e.printStackTrace();  
  56.         }  
  57.     }  
  58.     }  
  59.     publicstaticvoid main(String[] args) {  
  60.     new MuiltThreadImportDB().start();  
  61.     }  
  62. }  
第二步,使用多執行緒,批量插入資料
  1. class MultiThread implements Runnable {  
  2.     private ArrayList<String> list;  
  3.     public MultiThread(ArrayList<String> list) {  
  4.     this.list = list;  
  5.     }  
  6.     publicvoid run() {  
  7.     try {  
  8.         ImportDataBase insert = new ImportDataBase(list);  
  9.         insert.start();  
  10.     } catch (FileNotFoundException e) {  
  11.         e.printStackTrace();  
  12.     }  
  13.     display(this.list);  
  14.     }  
  15.     publicvoid display(List<String> list) {  
  16.     // for (String str : list) {
  17.     // System.out.println(str);
  18.     // }
  19.     System.out.print(Thread.currentThread().getName() + " :");  
  20.     System.out.println(list.size());  
  21.     }  
  22. }  

批量操作中,使用mysql的prepareStatement類,當然也使用了statement類的批量操作,效能比不上前者。前者可以達到1w+每秒的插入速度,後者只有2000+;
  1. publicint insertUserBehaviour(ArrayList<String> sqls) throws SQLException {  
  2.     String sql = "insert into user_behaviour_log (user_id,item_id,cat_id,merchant_id,brand_id,time_stamp,action_type)"
  3.         + " values(?,?,?,?,?,?,?)";  
  4.     preStmt = conn.prepareStatement(sql);  
  5.     for (int i = 0; i < sqls.size(); i++) {  
  6.         UserLog log =new UserLog(sqls.get(i));  
  7.         preStmt.setString(1, log.getUser_id());  
  8.         preStmt.setString(2, log.getItem_id());  
  9.         preStmt.setString(3, log.getCat_id());  
  10.         preStmt.setString(4, log.getMerchant_id());  
  11.         preStmt.setString(5, log.getBrand_id());  
  12.         preStmt.setString(6, log.getTimeStamp());  
  13.         preStmt.setString(7, log.getActionType());  
  14.         preStmt.addBatch();  
  15.         if ((i + 1) % 10000 == 0) {  
  16.         preStmt.executeBatch();  
  17.         conn.commit();  
  18.         preStmt.clearBatch();  
  19.         }  
  20.     }  
  21.     preStmt.executeBatch();  
  22.     conn.commit();  
  23.     return1;  
  24.     }  

當然,也實驗了不同的mysql儲存引擎,InnoDB和MyISM,實驗結果發現,InnoDB更快(3倍左右),可能和mysq的新版本有關係,筆者的mysql版本是5.6。

最後總結一下,大資料量下,提高插入速度的方法。

Java程式碼方面,使用多執行緒插入,並且使用批處理提交。

資料庫方面,表結構建立時不要使用索引,要不然插入過程過還要維護索引B+樹;修改儲存引擎,一般預設是InnoDB,(新版本就使用預設就可以,老版本可能需要)。