1. 程式人生 > >mybatis使用load data local infile實現大批量匯入資料到mysql資料庫

mybatis使用load data local infile實現大批量匯入資料到mysql資料庫

背景:

專案框架為:dubbo+zookeeper+ssm    資料庫為mysql

最近有個新的需求,要在程式碼裡實現往資料庫插入大批量資料,每次插入的資料量從10萬~50萬條不等,而且每條資料有80多個欄位,大概估算了一下,一條資料大小差不多是1kb,那麼每次插入的資料量大小應該在100M~500M之間。這個資料量還是很大的。

想來想去,我就先按照從開發到除錯所碰到問題的順序來寫好了,到後面我再貼出程式碼,供同行們參考。

1、碰到的第一個問題是:

 Packet for query is too large (1139736> 1048576). You can change this value on the server by setting

 the 'max_allowed_packet' variable. 

剛接到需求時,根本沒有考慮到資料量的問題,就按照平時開發那樣,直接往資料庫裡執行insert,就報了上面這個錯,原來因為mysql有一個max_allowed_packet變數,可以控制其通訊緩衝區的最大長度,所以當緩衝區的大小太小的時候,導致某些查詢和插入操作報錯。

解決辦法:

資料庫執行命令 show VARIABLES like '%max_allowed_packet%'; 檢視資料庫max_allowed_packet變數配置時多少,顯示的結果為

+--------------------+---------+ | Variable_name      | Value   | +---------

        -----------+---------+ | max_allowed_packet | 1048576 | +--------------------+---------+

這說明當前的配置時1M,我們需要將其設定大一些。

資料庫執行命令  set global max_allowed_packet = 4*1024*1024*10,將值設定為40M,,執行完後,關掉資料庫視覺化介面,重新開啟,要是命令列進行的就重啟mysql(不重啟的話是不行的,切記重啟mysql),然後接著執行命令show VARIABLES like '%max_allowed_packet%';看看有沒有設定成功。一般都是可行的!至此,第一個問題解決。

2、碰到的第二個問題是:

com.alibaba.dubbo.remoting.transport.AbstractCodec.checkPayload() ERROR  Data length too large: 11557050, max payload: 8388608 java.io.IOException: Data length too large: 11557050, max payload: 838860

這個錯是dubbo相關的,為什麼會報這個錯呢?想來想去,可能是service服務端讀取到的資料量太大,服務端提供給web客戶端的資料量就過大,超過了dubbo的預設值8M,錯誤資訊如上所示,天哪,第一次碰到這樣的報錯,各種查資料,最終還是解決了。

解決辦法:

        方法1、 修改提供方的dubbo配置,

            在dubbo.properties 中增加如下

dubbo.protocol.dubbo.payload=41943040(預設為8M,即8388608)

        方法2、

             在dubbo-provider.xml檔案配置如下

            <dubbo:provider id="payload" payload="41943040"/>

    如上兩種方法都是將值修改為40M。

3、碰到的第三個問題:

使用mysql的load data local infile往資料庫導資料時,英文和數字都正常匯入,但是,中文要麼不顯示,要麼就是亂碼,真的是搞不懂了,怎麼會這樣呢?以下是匯入部分程式碼:

   public void batchInsert(List<BqLoan> bqLoanList) throws ClassNotFoundException, SQLException {
        //1000條一提交
        int COMMIT_SIZE=1000;
        //一共多少條
        int COUNT=bqLoanList.size();
        Connection conn= null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            String url = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.url","CONF_HOME");
            String user = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.username","CONF_HOME");
            String password = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.password","CONF_HOME");
            conn= DriverManager.getConnection(url,user,password);
            conn.setAutoCommit(false);
            String exectuteSql = "load data local infile ''into table bq_loan fields terminated by ','";
            PreparedStatement pstmt = conn.prepareStatement(exectuteSql);
           StringBuilder sb = new StringBuilder();
            for (int i = 0; i < COUNT; i++) {
                sb.append(getTestDataInputStream(bqLoanList.get(i)));
                if (i % COMMIT_SIZE == 0) {
                    InputStream is = null;
                    try {
                        is = new ByteArrayInputStream(sb.toString().getBytes());
                        ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
                        pstmt.execute();
                        conn.commit();
                        sb.setLength(0);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            }
            InputStream is = null;
            try {
                is = new ByteArrayInputStream(sb.toString().getBytes());
                ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
                pstmt.execute();
                conn.commit();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally{
            conn.close();
        }
    }
}
上面程式碼就是匯入部分的一個方法,怎麼導中文都不顯示,還有些欄位中文為亂碼,我想肯定是字符集的問題,首先查了下資料庫字符集,(查詢命令為:show variables like '%char%';),然後看看程式碼,查詢結果顯示資料庫字符集為utf8,然後百度發現這個匯入程式碼得加上編碼格式:
"load data local infile ''into table bq_loan fields terminated by ','"; 

這個加上紅色部分編碼格式設定後如下,

 "load data local infile ''into table bq_loan character set utf8 fields terminated by ','";

 修改完後再次匯入,還是一樣,中文不顯示,有些欄位中文亂碼,這就頭疼了啊,仔細檢查,加上各種百度,才發現程式碼裡自己還挖了個坑,

is = new ByteArrayInputStream(sb.toString().getBytes());

這個將位元組陣列轉換為輸入流時,括號裡將字串轉換成位元組陣列時,並沒有給定轉換後的位元組陣列的編碼格式,所以採用的就是預設的編碼格式,我們知道不同編碼格式,單箇中英文多對應的位元組數是不一樣的。所以我猜測是這個地方沒有設定,導致生成的位元組陣列編碼格式和資料庫編碼格式不一致,最終導致導資料時中文不顯示以及亂碼。然後給getBytes()方法加上編碼格式,程式碼如下。

is = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));

加上後再進行匯入資料,一切順利,資料一點兒不差的導到庫裡。

到這裡導數就順利進行了,但是想到以後業務發展壯大時,設定的dubbo的服務端給消費端提供資料量最大值還會不夠用,所以就就決定改一下程式碼,最後和同事討論,建議採取分批插入,就是呼叫service服務端時進行分頁處理,每頁資料量設定為dubbo允許服務端給消費端提供資料量最大值的範圍內,然後每次插入資料時,就會進行分批插入,只不過和資料庫互動次數相對多幾次而已,影響不大。

還有一個,使用"load data local infile"導資料時,我是直接將查詢出來的結果(list集合)進行資料的組合,即每條資料的每個欄位間使用“,”隔開,每條資料之間使用“/n”換行隔開,最終將每條資料拼接成一個字串,然後將字串轉換成位元組陣列並轉換成輸入流,然後再執行匯入操作,再往後就比較簡單了。由於我不是通過檔案進行導數操作,所以 load data local infile '' into table bq_loan character set utf8 fields terminated by ','"  中紅色部分的檔名地址我就不寫。

最終的結果是:

        匯入1萬條資料,用時5.5秒左右

        匯入2.6萬資料, 用時17.8秒左右

        ......

        匯入35萬條資料,用時210秒左右

        匯入50萬條資料,用時305秒左右 

    我這個每條的資料量比較大,一條大概是1kb,所以,感覺速度還行吧,能實現我的需求。     

好了,下面貼出部分程式碼,供大家對照參考。

public void insertLoanInfo (Map<String,Object> msg) {
        try {
            long startTime = DateOperation.currentTimeMills();
            List<AssetPkgRel> loanList = (List<AssetPkgRel>)msg.get("loanList");
            String pkgName = (String) msg.get("pkgName");
            String pkgCde = (String) msg.get("pkgCde");
            // 備份時間
            String bkTime = DateOperation.convertToDateStr1(DateOperation.currentTimeMills());
            msg.put("bkTime",bkTime);
            if (IS_ONE_KEY_ASSOCIATED.getCode().equals(msg.get("isOneKeyAssociated"))) {
                BqLoanService.deleteByPkgCde(pkgCde);
            }
            List<String> bkList = BqLoanService.selectNumByLoanNo(loanList);
            // 總共的頁數
            double totalPage = Math.ceil(bkList.size()/25000.0);
            Map<String,Object> map = new HashMap<String,Object>();
            map.put("loanList",loanList);
            List<BqLoan> list = null;
            for (int i = 1;i <= totalPage;i++) {
                List<BqLoan> bqLoanList = new ArrayList<>();
                map.put("page",i);
                PageInfo<BqLoan> pageInfo = BqLoanService.selectByLoanNo(map);
                list = pageInfo.getList();
                for (int j= 0;j < list.size();j++) {
                    BqLoan BqLoan = list.get(j);
                    BqLoan.setPkgCde(pkgCde);
                    BqLoan.setPkgName(pkgName);
                    BqLoan.setArchTm(bkTime);
                    bqLoanList.add(BqLoan);
                }
                // 將當前頁資料插入資料庫
                batchInsert(bqLoanList);
                // 當前頁插入完之後清空list
                bqLoanList.clear();
            }
            long endTime = DateOperation.currentTimeMills();
            System.out.println("===============插入總時間:"+(endTime-startTime));
        } catch (BusinessException e) {
            logger.error("插入資料異常 "+e.getMessage());
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
public void batchInsert(List<BqLoan> bqLoanList) throws ClassNotFoundException, SQLException {
        //1000條一提交
        int COMMIT_SIZE=1000;
        //一共多少條
        int COUNT=bqLoanList.size();
        Connection conn= null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            String url = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.url","CONF_HOME");
            String user = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.username","CONF_HOME");
            String password = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.password","CONF_HOME");
            conn= DriverManager.getConnection(url,user,password);
            conn.setAutoCommit(false);
            String exectuteSql = "load data local infile ''into table bq_loan character set utf8 fields terminated by ','";
            PreparedStatement pstmt = conn.prepareStatement(exectuteSql);
           StringBuilder sb = new StringBuilder();
            for (int i = 0; i < COUNT; i++) {
                sb.append(getTestDataInputStream(bqLoanList.get(i)));
                if (i % COMMIT_SIZE == 0) {
                    InputStream is = null;
                    try {
                        is = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
                        ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
                        pstmt.execute();
                        conn.commit();
                        sb.setLength(0);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            }
            InputStream is = null;
            try {
                is = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
                ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
                pstmt.execute();
                conn.commit();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally{
            conn.close();
        }
    }
}

   /**
     *  組裝需要插入的資料,欄位間以","隔開,每條資料間以"/n"隔開
     */
    public static StringBuilder getTestDataInputStream(BqLoan BqLoan) {
            StringBuilder builder = new StringBuilder();
            builder.append(BqLoan.getSeq());
            builder.append(",");
            builder.append(BqLoan.getLoanNumber());
            builder.append(",");
            builder.append(BqLoan.gettPkgCde());
            builder.append(",");
            builder.append(BqLoan.getPkgName());
            builder.append(",");
            builder.append(BqLoan.getCustemerSeq());
            builder.append(",");
            builder.append(BqLoan.getCustemerName());
            builder.append(",");
            builder.append(BqLoan.getIdType());
            builder.append(",");
            builder.append(BqLoan.getIdNo());
            builder.append(",");
            builder.append(BqLoan.getPhoneNo());
            builder.append(",");
            builder.append("\n");
        return builder;
    }

到此結束,如果有遇到這些問題,然後這篇文章還不能夠幫助到你,可以一起再探討,歡迎騷擾。

剛畢業的小白,歡迎各位同行大佬指導!

謝謝!