1. 程式人生 > >30G 上億資料的超大檔案,如何快速匯入生產環境?

30G 上億資料的超大檔案,如何快速匯入生產環境?

Hello,大家好,我是樓下小黑哥~ 如果給你一個包含一億行資料的超大檔案,讓你在一週之內將資料轉化匯入生產資料庫,你會如何操作? 上面的問題其實是小黑哥前段時間接到一個真實的業務需求,將一個老系統歷史資料通過線下檔案的方式遷移到新的生產系統。 ~~由於老闆們已經敲定了新系統上線時間,所以只留給小黑哥一週的時間將歷史資料匯入生產系統。~~ 由於時間緊,而資料量又超大,所以小黑哥設計的過程想到一下解決辦法: - 拆分檔案 - 多執行緒匯入 > 歡迎關注我的公眾號:小黑十一點半,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:[studyidea.cn](https://studyidea.cn) ## 拆分檔案 首先我們可以寫個小程式,或者使用拆分命令 `split ` 將這個超大檔案拆分一個個小檔案。 ```shell -- 將一個大檔案拆分成若干個小檔案,每個檔案 100000 行 split -l 100000 largeFile.txt -d -a 4 smallFile_ ``` 這裡之所以選擇先將大檔案拆分,主要考慮到兩個原因: 第一如果程式直接讀取這個大檔案,假設讀取一半的時候,程式突然宕機,這樣就會直接丟失檔案讀取的進度,又需要重新開頭讀取。 而檔案拆分之後,一旦小檔案讀取結束,我們可以將小檔案移動一個指定資料夾。 這樣即使應用程式宕機重啟,我們重新讀取時,只需要讀取剩餘的檔案。 第二,一個檔案,只能被一個應用程式讀取,這樣就限制了匯入的速度。 而檔案拆分之後,我們可以採用多節點部署的方式,水平擴充套件。每個節點讀取一部分檔案,這樣就可以成倍的加快匯入速度。 ![](https://img2020.cnblogs.com/other/1419561/202012/1419561-20201224083138027-1258893766.jpg) ## 多執行緒匯入 當我們拆分完檔案,接著我們就需要讀取檔案內容,進行匯入。 之前拆分的時候,設定每個小檔案包含 10w 行的資料。由於擔心一下子將 10w 資料讀取應用中,導致堆記憶體佔用過高,引起頻繁的 **Full GC**,所以下面採用流式讀取的方式,一行一行的讀取資料。 > 當然了,如果拆分之後檔案很小,或者說應用的堆記憶體設定很大,我們可以直接將檔案載入到應用記憶體中處理。這樣相對來說簡單一點。 逐行讀取的程式碼如下: ```java File file = ... try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line=iterator.nextLine(); convertToDB(line); } } ``` 上面程式碼使用 `commons-io` 中的 `LineIterator`類,這個類底層使用了 `BufferedReader` 讀取檔案內容。它將其封裝成迭代器模式,這樣我們可以很方便的迭代讀取。 如果當前使用 JDK1.8 ,那麼上述操作更加簡單,我們可以直接使用 JDK 原生的類 `Files`將檔案轉成 `Stream` 方式讀取,程式碼如下: ```java Files.lines(Paths.get("檔案路徑"), Charset.defaultCharset()).forEach(line -> { convertToDB(line); }); ``` 其實仔細看下 `Files#lines`底層原始碼,其實原理跟上面的 `LineIterator`類似,同樣也是封裝成迭代器模式。 ![](https://img2020.cnblogs.com/other/1419561/202012/1419561-20201224083138660-588710224.jpg) ### 多執行緒的引入存在的問題 上述讀取的程式碼寫起來不難,但是存在效率問題,主要是因為只有單執行緒在匯入,上一行資料匯入完成之後,才能繼續操作下一行。 為了加快匯入速度,那我們就多來幾個執行緒,併發匯入。 多執行緒我們自然將會使用執行緒池的方式,相關程式碼改造如下: ```java File file = ...; ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, // 檔案數量,假設檔案包含 10W 行 new ArrayBlockingQueue<>(10*10000), // guava 提供 new ThreadFactoryBuilder().setNameFormat("test-%d").build()); try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> { convertToDB(line); }); } } ``` 上述程式碼中,每讀取到一行內容,就會直接交給執行緒池來執行。 我們知道執行緒池原理如下: 1. 如果核心執行緒數未滿,將會直接建立執行緒執行任務。 2. 如果核心執行緒數已滿,將會把任務放入到佇列中。 3. 如果佇列已滿,將會再建立執行緒執行任務。 4. 如果最大執行緒數已滿,佇列也已滿,那麼將會執行拒絕策略。 ![執行緒池執行流程圖](https://img2020.cnblogs.com/other/1419561/202012/1419561-20201224083139040-992119937.jpg) 由於我們上述執行緒池設定的核心執行緒數為 5,很快就到達了最大核心執行緒數,後續任務只能被加入佇列。 為了後續任務不被執行緒池拒絕,我們可以採用如下方案: - 將佇列容量設定成很大,包含整個檔案所有行數 - 將最大執行緒數設定成很大,數量大於件所有行數 以上兩種方案都存在同樣的問題,第一種是相當於將檔案所有內容載入到記憶體,將會佔用過多記憶體。 而第二種建立過多的執行緒,同樣也會佔用過多記憶體。 一旦記憶體佔用過多,GC 無法清理,就可能會引起頻繁的 **Full GC**,甚至導致 **OOM**,導致程式匯入速度過慢。 解決這個問題,我們可以如下兩種解決方案: - `CountDownLatch` 批量執行 - 擴充套件執行緒池 ### `CountDownLatch` 批量執行 JDK 提供的 `CountDownLatch`,可以讓主執行緒等待子執行緒都執行完成之後,再繼續往下執行。 利用這個特性,我們可以改造多執行緒匯入的程式碼,主體邏輯如下: ```java try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { // 儲存每個任務執行的行數 List lines = Lists.newArrayList(); // 儲存非同步任務 List tasks = Lists.newArrayList(); while (iterator.hasNext()) { String line = iterator.nextLine(); lines.add(line); // 設定每個執行緒執行的行數 if (lines.size() == 1000) { // 新建非同步任務,注意這裡需要建立一個 List tasks.add(new ConvertTask(Lists.newArrayList(lines))); lines.clear(); } if (tasks.size() == 10) { asyncBatchExecuteTask(tasks); } } // 檔案讀取結束,但是可能還存在未被內容 tasks.add(new ConvertTask(Lists.newArrayList(lines))); // 最後再執行一次 asyncBatchExecuteTask(tasks); } ``` 這段程式碼中,每個非同步任務將會匯入 1000 行資料,等積累了 10 個非同步任務,然後將會呼叫 `asyncBatchExecuteTask` 使用執行緒池非同步執行。 ```java /** * 批量執行任務 * * @param tasks */ private static void asyncBatchExecuteTask(List tasks) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); for (ConvertTask task : tasks) { task.setCountDownLatch(countDownLatch); executorService.submit(task); } // 主執行緒等待非同步執行緒 countDownLatch 執行結束 countDownLatch.await(); // 清空,重新新增任務 tasks.clear(); } ``` `asyncBatchExecuteTask` 方法內將會建立 `CountDownLatch`,然後主執行緒內呼叫 `await`方法等待所有非同步執行緒執行結束。 `ConvertTask` 非同步任務邏輯如下: ```java /** * 非同步任務 * 等資料匯入完成之後,一定要呼叫 countDownLatch.countDown() * 不然,這個主執行緒將會被阻塞, */ private static class ConvertTask implements Runnable { private CountDownLatch countDownLatch; private List lines; public ConvertTask(List lines) { this.lines = lines; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { for (String line : lines) { convertToDB(line); } } finally { countDownLatch.countDown(); } } } ``` `ConvertTask`任務類邏輯就非常簡單,遍歷所有行,將其匯入到資料庫中。所有資料匯入結束,呼叫 `countDownLatch#countDown`。 一旦所有非同步執行緒執行結束,呼叫 `countDownLatch#countDown`,主執行緒將會被喚醒,繼續執行檔案讀取。 雖然這種方式解決上述問題,但是這種方式,每次都需要積累一定任務數才能開始非同步執行所有任務。 另外每次都需要等待所有任務執行結束之後,才能開始下一批任務,批量執行消耗的時間等於最慢的非同步任務消耗的時間。 這種方式執行緒池中執行緒存在一定的閒置時間,那有沒有辦法一直壓榨執行緒池,讓它一直在幹活呢? ### 擴充套件執行緒池 回到最開始的問題,檔案讀取匯入,其實就是一個**生產者-消費者**消費模型。 主執行緒作為生產者不斷讀取檔案,然後將其放置到佇列中。 非同步執行緒作為消費者不斷從佇列中讀取內容,匯入到資料庫中。 **一旦佇列滿載,生產者應該阻塞,直到消費者消費任務。** 其實我們使用執行緒池的也是一個**生產者-消費者**消費模型,其也使用阻塞佇列。 那為什麼執行緒池在佇列滿載的時候,不發生阻塞? 這是因為執行緒池內部使用 `offer` 方法,這個方法在佇列滿載的時候**不會發生阻塞**,而是直接返回 。 ![](https://img2020.cnblogs.com/other/1419561/202012/1419561-20201224083139377-18183902.jpg) 那我們有沒有辦法線上程池佇列滿載的時候,阻塞主執行緒新增任務? 其實是可以的,我們自定義執行緒池拒絕策略,當佇列滿時改為呼叫 `BlockingQueue.put` 來實現生產者的阻塞。 ```java RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } }; ``` 這樣一旦執行緒池滿載,主執行緒將會被阻塞。 使用這種方式之後,我們可以直接使用上面提到的多執行緒匯入的程式碼。 ```java ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("test-%d").build(), (r, executor) -> { if (!executor.isShutdown()) { try { // 主執行緒將會被阻塞 executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } }); File file = new File("檔案路徑"); try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> convertToDB(line)); } } ``` ## 小結 一個超大的檔案,我們可以採用拆分檔案的方式,將其拆分成多份檔案,然後部署多個應用程式提高讀取速度。 另外讀取過程我們還可以使用多執行緒的方式併發匯入,不過我們需要注意執行緒池滿載之後,將會拒絕後續任務。 我們可以通過擴充套件執行緒池,自定義拒絕策略,使讀取主執行緒阻塞。 好了,今天文章內容就到這裡,不知道各位有沒有其他更好的解決辦法,歡迎留言討論。 > 歡迎關注我的公眾號:小黑十一點半,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:[studyidea.cn](https://studyidea.cn)