1. 程式人生 > >【資料庫】load data infile上億條的海量資料匯入mysql的那些事

【資料庫】load data infile上億條的海量資料匯入mysql的那些事

因為做股票金融的,每天產生的資料量是很大的,一個月幾十億的交易記錄,也常有出現,特別是今年大跌之前大漲那會。

作為程式設計師,問題來了,有時需要將一些並不是特別符合規範的csv檔案匯入資料庫中,而且每個檔案有十幾萬行,而這樣的檔案幾萬個,於是幾十億的記錄如何匯入資料庫呢?很多想著常用的方法,就是將csv檔案讀出來,然後一條條插到資料庫,或者批量插到資料庫,或者開個多執行緒,然而,花費的時間必須是幾十天,而且不好控制,萬一中斷,那又不知該如何是好了。。。

對於我,我最先想到是開多執行緒,每次1000條批量插入資料庫,然而,由於我犯了一個致命的錯誤:對於大量資料修改,資料庫表先不要建索引,因為每次改動表,索引都會修改,這在資料量大的時候,消耗的時間也是比較大的。所以,我效果並不好,12個小時也才存了4千萬資料。看到這個效果,我感覺不對勁,因為我要存的資料,至少是幾十億的,於是我覺得肯定有更效率的方法,直到load data infile

才讓我覺得人生處處有驚喜!!

用LOAD DATA INFILE將1.csv匯入資料庫表d201505的語句如下:

LOAD DATA INFILE 'E:/1.csv'
INTO TABLE d201505
FIELDS TERMINATED BY ','
ENCLOSED BY '\"'
ESCAPED BY '\\'
LINES TERMINATED BY '\n'
(stockID,date,price,buysell,volume);
就這麼一條語句,一個含有十幾萬資料的匯入表中基本是秒級,於是我決定先將我的不規則的csv檔案讀寫成合乎資料表字段的csv,然後LOAD DATA INFILE匯入到資料表,並且我開了一百個多執行緒對檔案進行寫,開了三個執行緒對資料匯入,效果很樂觀,完整一個流程下來,10多億資料花了八個小時,平均十分鐘左右就能匯入1500萬資料,所以,在此記錄下,我的程式如下:
import java.io.File;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//用了javacsv這個外掛包
import com.csvreader.CsvReader;
import com.csvreader.CsvWriter;

public class CSVReadAndWrite {
    // 寫cvs檔案
    public static void writeToCsv(String dirPath, String ym, String d,
            String[] cvsfile) {
        ExecutorService pool = Executors.newFixedThreadPool(100);
        for (int j = 0; j < cvsfile.length; j++) {
            Thread thread = new MDownThread(dirPath, cvsfile[j], ym, d);
            pool.execute(thread);

        }
        pool.shutdown();
        try {
            pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    // 我匯入的是201504這一個月的資料,所以是1-30,按天匯入
    public static void main(String[] args) throws Exception {
        String ym = "201504";
        String d;

        for (int i = 1; i <= 30; i++) {
            Date t1 = new Date();
            String dirPath = "E:\\" + ym + "/" + ym;
            d = String.format("%02d", i);
            dirPath += d + "/";
            File file = new File(dirPath);
            String cvsfile[];
            cvsfile = file.list();
            if (cvsfile == null || cvsfile.length <= 0) {
                continue;
            }
            System.out.println(dirPath + "開始匯入!");
            // 寫入檔案
            writeToCsv(dirPath, ym, d, cvsfile);
            ExecutorService pool2 = Executors.newFixedThreadPool(3);
            for (int j = 0; j < cvsfile.length; j++) {
                if (!new File("E:/" + ym + "/" + ym + d + "/"
                        + cvsfile[j]).exists()) {
                    writeToCsv(dirPath, ym, d, cvsfile);
                }
                Thread thread2 = new LoadMysqlThread(dirPath, cvsfile[j], ym, d);
                pool2.execute(thread2);
            }
            pool2.shutdown();
            try {
                pool2.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            Date t2 = new Date();
            System.out.println(dirPath + "匯入成功,用時:"
                    + (t2.getTime() - t1.getTime()) / 1000 + "s");

        }

    }

}
//執行緒寫csv檔案
class MDownThread extends Thread {
    SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
    public String dirPath;
    public String cvsfile;
    public String ym;
    public String d;
    public int dj;

    public MDownThread(String dirPath, String cvsfile, String ym, String d) {
        this.dirPath = dirPath;
        this.cvsfile = cvsfile;
        this.ym = ym;
        this.d = d;
        this.dj = dj;
    }

    @Override
    public void run() {
        try {
            // 生成CsvReader物件,以,為分隔符,utf-8編碼方式
            CsvReader r = new CsvReader(dirPath + cvsfile, ',',
                    Charset.forName("utf-8"));
            CsvWriter wr = new CsvWriter("E:\\" + ym + "\\"
                    + ym + d + "\\" + cvsfile, ',', Charset.forName("utf-8"));
            // System.out.println(dirPath+cvsfile[j]);
            // 逐條讀取記錄,直至讀完
            while (r.readRecord()) {
                String regex = "(\\d+)";
                Pattern pattern = Pattern.compile(regex);
                Matcher matcher = pattern.matcher(cvsfile);
                String stockID = "";
                while (matcher.find()) {
                    stockID = matcher.group(1);
                }
                String date = sdf1.format(sdf.parse(ym + d + r.get(0)));
                File f = new File("E:\\shamrock\\project\\" + ym + "\\" + ym
                        + d + "\\" + cvsfile);
                if (!f.getParentFile().exists()) {
                    if (!f.getParentFile().mkdirs()) {
                        return;
                    }
                }
                String[] contents = { stockID, date, r.get(1), r.get(2),
                        r.get(3) };
                wr.writeRecord(contents);

            }

            r.close();
            wr.close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

class LoadMysqlThread extends Thread {
    SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
    public String dirPath;
    public String cvsfile;
    public String ym;
    public String d;
    public int dj;

    public LoadMysqlThread(String dirPath, String cvsfile, String ym, String d) {
        this.dirPath = dirPath;
        this.cvsfile = cvsfile;
        this.ym = ym;
        this.d = d;
        this.dj = dj;
    }
    //執行緒匯入資料庫
    @Override
    public void run() {
        try {
            
            Connection conn = null;
            String driver = "com.mysql.jdbc.Driver";
            String url = "jdbc:mysql://localhost:3306/tickdata";
            Class.forName(driver);
            conn = DriverManager.getConnection(url, "root", "123456");
            PreparedStatement pstmt = conn
                    .prepareStatement(" LOAD DATA INFILE 'E:/project/"
                            + ym
                            + "/"
                            + ym
                            + d
                            + "/"
                            + cvsfile
                            + "'   INTO TABLE d201505 FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (stockID,date,price,buysell,volume);");
            pstmt.execute();
            conn.close();
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}


在此再說一個問題,比如上面的程式在自己電腦,匯入本地mysql資料庫沒問題,但問題來了,因為資料量較大,想讓別的電腦也來幫忙,然後存在我的電腦上,換句話說就是電腦A上的資料想匯入到電腦B上的mysql中,如果你用上面的命令,肯定會報錯找不到檔案,因為那語句指定的檔案是以mysql的安裝路徑為相對路徑的,所以如果不在B電腦的資料自然會報錯,於是此時你要LOCAL來指定是將本地A的資料匯入到B mysql中,語法如下:
LOAD DATA LOCAL INFILE 'E:/1.csv'   
INTO TABLE d201505 
FIELDS TERMINATED BY ',' 
ENCLOSED BY '\"' 
ESCAPED BY '\\' 
LINES TERMINATED BY '\n'
(stockID,date,price,buysell,volume);

也就是LOAD DATA INFILE中加入LOCAL.....

好了,如果有遇到我這樣問題的,希望能幫助到你哦!!!

相關推薦

資料庫load data infile海量資料匯入mysql那些

因為做股票金融的,每天產生的資料量是很大的,一個月幾十億的交易記錄,也常有出現,特別是今年大跌之前大漲那會。 作為程式設計師,問題來了,有時需要將一些並不是特別符合規範的csv檔案匯入資料庫中,而且每個檔案有十幾萬行,而這樣的檔案幾萬個,於是幾十億的記錄如何匯入資料庫呢?

利用LOAD DATA將csv檔案中的資料匯入MySQL

先貼程式碼: #coding=utf-8 import csv, os, re import MySQLdb _PATH = 'D:/bike_sharing_data/trip_data/trip_history/' _TABLE_NAME = 't

MySQL實踐經驗LOAD DATA INFILE 報錯 ERROR 1148 (42000) 或 ERROR 1045 (28000)的解決辦法

3. 非root使用者從client機器load data local infile至remote mysql server時,報錯:        ERROR 1148 (42000): The used command is not allowed with this MySQL version    

我們如何用Cassandra每天儲存線上資料

譯者注:Discord 是一款國外的類似 YY 的語音聊天軟體。 Discord 語音聊天軟體及我們的 UGC 內容的增長速度比想象中要快得多。隨著越來越多使用者的加入,帶來了更多聊天訊息。 2016 年 7 月,每天大約有 4 千萬條訊息; 2016 年 12 月,每天超過億條。 當寫這篇文章

Discord 公司如何使用 Cassandra 儲存線上資料

Discord 是一款國外的類似 YY 的語音聊天軟體。Discord 語音聊天軟體及我們的 UGC 內容的增長速度比想象中要快得

轉載Mysql load data infile用法(萬級數據導入,在幾秒之內)

輸入 工具 分割 field 服務 分隔 load data tab lose https://blog.csdn.net/u014082714/article/details/53173975 http://blog.itpub.net/26506993/viewspac

資料庫-MySQL資料庫8.0.x安裝的時候,與Navicat連結不-Navicat 1251錯誤

嗯……安裝一個數據庫,過程不是很順利啊; 其實主要還是因為MySQL版本的更新帶來的問題; 比較矛盾的是,cmd安裝最新版的MySQL需要用最新的更改口令的方法, 而Navicat(或者因為我下的版本比較老的原因),支援的是老方法,也就是我們SET的方法;

php文件的傳與下載

下載 類型 臨時文件 沒有 ipa pic 文件數量 上傳與下載 文件夾 一、 生活中常見的地方:  a) 例如郵箱、空間、文庫、百度雲、微愛等地方,都可以看到文件的上傳和下載的應用,因此,上傳和下載的功能非常重要!二、 PHP當中的文件上傳和下載  a) 我們需要進行一些

springmvcspringmvc中如何傳文件

doc 問題 ffffff color 遍歷 -c tip int imu 使用到的組件:可自行根據情況選擇版本 <!--上傳依賴包--> <dependency> <groupId

文件各種傳,離不開的表單

dal 不變 沒有 each post str 線程 order bmi 閱讀目錄 利用表單實現文件上傳 表單異步上傳(jquery.form插件) 模擬表單數據上傳(FormData) 分片上傳 使用HTML5 拖拽、粘貼上傳 上傳插件(WebUploader

圖片移動端圖片傳旋轉、壓縮的解決方案

dex adding .com pad 移動 side www shu add 移動端圖片上傳旋轉、壓縮的解決方案 來源 知乎 作者 林鑫 工作上有手機上傳準考證等圖片的功能,這個是非常必要的,作者寫的很全面,就直接記錄這個地址了 還有一篇 文件的上傳、下載

load data infile

mina lds 文件的 斜杠 enc closed line txt ignore LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE ‘file_name‘ [REPLACE | IGNORE] IN

ArcGISArcGIS Data Store配置

cnblogs min art 技術 with account portal sta com 一、錯誤提示 Unable to configure the ArcGIS Data Store with the GIS Server. Please make sure th

如何使用Git傳本地項目到github?(mac版)

重復 ack you 生成 是什麽 exp git init fin git add 原文鏈接:http://www.cnblogs.com/lijiayi/p/pushtogithub.html 在此假設你已經在 github 上創建好了一個項目,像這樣: 並且你已經完

mysql load data infile

txt closed prior nes lac user mysq data optional LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE ‘file_name.txt‘ [REPLACE | IGN

ZT在微信有哪些高情商的說話方式 | M周刊(聽語音需要60秒,看文字只需10秒)

con 復雜 直接 對不起 評論 關系 專註 應該 知識 https://baijiahao.baidu.com/s?id=1590547145580792217&wfr=spider&for=pc https://yuedu.baidu.com/hybri

004-spring-data-elasticsearch 3.0.0.0使用-spring-data之定義方法、創建repository實例、從聚合根發布事件

-- ble sch current 4.3 ... reference tex manager 續上文 1.4、定義方法   存儲庫代理有兩種方法可以從方法名稱派生特定於存儲的查詢。它可以直接從方法名稱派生查詢,或者使用手動定義的查詢。可用選項取決於實際store。但

感悟或許這世界分為兩種人吧

class bsp OS 而是 希望 blog 而不是 定律 nbsp 或許世界上分為兩種人吧 一種是能發現新東西的人 (我認為所有的東西都是已存在的,只是等待人去發現,無論是各種定律,亦或是各種新的材料等, 創新並不是創造了什麽新東西,而是發現了以前沒發現的東西,真

翻譯:load data infile(已提交到MariaDB官方手冊)

linu AD -s 右下角 get family 支持 data log 本文為mariadb官方手冊:LOAD DATA INFILE的譯文。 原文:https://mariadb.com/kb/en/load-data-infile/ 我提交到MariaDB官方

LOAD DATA INFILE & mysqlimport

closed cape char reconnect thread script ESS 保持 nor +++++++++++++++++++++++++++++++++++++++++++++mysqlimport+++++++++++++++++++++++++++++