1. 程式人生 > >資料庫大資料量匯出多執行緒版本原始碼部分

資料庫大資料量匯出多執行緒版本原始碼部分

package com.alibaba.crm.finance.bo.export;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;

import com.ali.shy.dao.Queryable;
import com.ali.shy.util.JSON;
import com.ali.shy.util.json.ParseException;
import com.alibaba.crm.finance.biz.common.BizConstant;
import com.alibaba.crm.finance.biz.export.CompressedFilesUtil;
import com.alibaba.crm.finance.biz.export.NotepadRender;
import com.alibaba.crm.finance.common.log.LogModel;
import com.alibaba.crm.finance.common.log.LogUtil;
import com.alibaba.crm.finance.dataobject.base.sys.FinExportTask;

/**
 * @author longer 2012-4-16 \u4e0b\u53485:04:55
 */
@SuppressWarnings({ "unchecked" })
public class QueueGenerateExcel implements BeanFactoryAware, ExportStartNow {

    private FinExportTaskBo        finExportTaskBo;

    private NotepadRender          notepadRender;

    private static final Logger    LOG              = Logger.getLogger(QueueGenerateExcel.class);

    private static final Object    LOCK             = new Object();
    private static final int       THREAD_POOL_SIZE = 3;

    private static ExecutorService EXECUTOR_SERVICE = null;
    static {
        EXECUTOR_SERVICE = Executors.newFixedThreadPool(THREAD_POOL_SIZE, new ThreadFactory() {

            final AtomicInteger threadNumber = new AtomicInteger(1);

            public Thread newThread(Runnable r) {
                ThreadGroup group = Thread.currentThread().getThreadGroup();
                Thread t = new Thread(group, r, "export-pool-" + threadNumber.getAndIncrement(), 0);
                return t;
            }
        });
    }

    public void setNotepadRender(NotepadRender notepadRender) {
        this.notepadRender = notepadRender;
    }

    private static BeanFactory mFact;

    public void setBeanFactory(BeanFactory beanFactory) {
        mFact = beanFactory;
    }

    public void setFinExportTaskBo(FinExportTaskBo finExportTaskBo) {
        this.finExportTaskBo = finExportTaskBo;
    }

    /*
     * (non-Javadoc)
     * @see com.alibaba.crm.finance.bo.export.ExportStartNow#startExportNow()
     */
    public void startExportNow() {
        synchronized (LOCK) {
            LOCK.notifyAll();
        }
    }

    public void init() {
        try {
            for (int i = 0; i < THREAD_POOL_SIZE; i++) {
                EXECUTOR_SERVICE.execute(new Generate(i));
            }
            EXECUTOR_SERVICE.shutdown();
        } catch (Exception e) {
            LogModel model = new LogModel("QueueGenerateExcel", "\u5bfc\u51fa\u5b9a\u65f6\u949f", e);
            LogUtil.error(LOG, model);
        }
    }

    public void close() {
        EXECUTOR_SERVICE.shutdownNow();
    }

    private class Generate extends Thread {

        private static final int  COMPUTE_TIME_TIMES        = 10;
        private static final int  DEFFER_MIN_PAGE_SIZE      = 1000;
        private static final int  ONE_SECOND                = 1 * 1000;
        private static final int  PAGE_PRE_SIZE             = 4000;

        private static final int  WAIT_TIMEOUT_FOR_OUT_NULL = 1000;
        private static final long WAIT_TIMEOUT_FOR_NULL     = 2 * 60 * 1000L;

        public Generate(int i){
            super("Generate_export_" + i);
        }

        public void run() {
            LogModel model = new LogModel("QueueGenerateExcel Generate run");
            while (Boolean.TRUE) {
                FinExportTask finExportTask = null;
                try {
                    finExportTask = loadNextTask();
                    if (finExportTask != null) {
                        deleteBeforeFile(finExportTask.getFileName());
                        if (executeOneTask(finExportTask)) {
                            finExportTaskBo.finishTheTask(finExportTask.getId(), finExportTask.getFileName(),
                                                          finExportTask.getFileSize());
                            LogUtil.logInfo(model, LOG, finExportTask + " finish!!");
                        } else {
                            LogUtil.logInfo(model, LOG, finExportTask + " break!!");
                        }

                    }
                    sleep(finExportTask);
                } catch (Exception e) {
                    if (finExportTask != null) {
                        finExportTask.setServiceEndDate(new Date());
                        finExportTaskBo.failureTheTsak(finExportTask.getId());
                    }
                    LogUtil.logError(model, LOG, e.getMessage(), e);
                }
            }
        }

        private void deleteBeforeFile(String fileName) {
            LogModel model = new LogModel("QueueGenerateExcel Generate deleteBeforeFile");
            try {
                LogUtil.logInfo(model, LOG, "deleteBeforeFile START");
                File f = new File(fileName);
                if (f.exists()) {
                    f.renameTo(new File(fileName + "." + new Date().getTime() + ".back"));
                }
                File frar = new File(fileName + CompressedFilesUtil.FILE_STYLE);
                if (frar.exists()) {
                    frar.renameTo(new File(fileName + CompressedFilesUtil.FILE_STYLE + "." + new Date().getTime()
                                           + ".back"));
                }
                LogUtil.logInfo(model, LOG, "deleteBeforeFile END");
            } catch (Exception e) {
                LogUtil.logError(model, LOG, e.getMessage(), e);
            }
        }

        private void sleep(FinExportTask finExportTask) throws InterruptedException {
            if (finExportTask == null) {
                synchronized (LOCK) {
                    LOCK.wait(WAIT_TIMEOUT_FOR_NULL);// wait 2min
                }
            } else {
                Thread.sleep(WAIT_TIMEOUT_FOR_OUT_NULL);// sleep 1s
            }
        }

        private FinExportTask loadNextTask() throws InterruptedException {
            LogModel model = new LogModel("loadNextTask");
            while (Boolean.TRUE) {
                FinExportTask finExportTask = finExportTaskBo.loadNextTask();
                if (finExportTask == null) {
                    LogUtil.logInfo(model, LOG, "all finExportTask has  finish !");
                    return finExportTask;
                }
                if (finExportTaskBo.startTheTask(finExportTask)) {
                    finExportTask.setStatus(BizConstant.EXPORT_TASK_STATUS_SERVICE);
                    finExportTask.setServiceBeginDate(new Date());
                    LogUtil.logInfo(model, LOG, finExportTask + " start!");
                    return finExportTask;
                } else {
                    LogUtil.logInfo(model, LOG, finExportTask + " is in service!");
                }
                Thread.sleep(ONE_SECOND);
            }
            return null;
        }

        private Export genExport(FinExportTask finExportTask) throws ParseException {
            Export e = new Export();
            String json = finExportTask.getSearchCondition();
            Map<String, Object> data = (Map<String, Object>) JSON.parse(json);

            String fileName = finExportTask.getFileName();
            String action = String.valueOf(data.get("dao"));

            String query = String.valueOf(data.get("query"));
            if ("null".equals(query)) {
                query = null;
            }
            int max = Integer.valueOf(String.valueOf(data.get("max")));
            int skip = Integer.valueOf(String.valueOf(data.get("skip")));
            e.setSkip(skip);
            e.setAction(action);
            e.setFileName(fileName);
            e.setMax(max);
            e.setQuery(query);
            e.setData(data);
            return e;
        }

        private Boolean executeOneTask(FinExportTask finExportTask) throws ParseException, SQLException, IOException,
                                                                   InterruptedException {
            LogModel model = new LogModel("executeOneTask");
            Export e = genExport(finExportTask);
            reviseMax(e);
            int pageSize = e.getMax() / PAGE_PRE_SIZE + 1;// \u9875\u6570
            if ((e.getMax() % PAGE_PRE_SIZE) == 0) {
                pageSize = pageSize - 1;
            }

            Queue<Long> times = new LinkedList<Long>();
            for (int i = 0; i < pageSize; i++) {
                if (breakTask(finExportTask)) {
                    LogUtil.logInfo(model, LOG, finExportTask + " break!!");
                    return Boolean.FALSE;
                }
                int skipp = e.getSkip() + i * PAGE_PRE_SIZE;
                int pagePreSize = PAGE_PRE_SIZE;
                if (i == pageSize - 1) {
                    pagePreSize = e.getMax() - (pageSize - 1) * PAGE_PRE_SIZE;
                    // FOR DEEFER
                    if (pagePreSize <= DEFFER_MIN_PAGE_SIZE && e.getAction().equals("defferDaoModelAction")) {
                        pagePreSize = DEFFER_MIN_PAGE_SIZE + 1;
                    }
                }
                long startTime = new Date().getTime();
                List<Object> datas = getDao(e.getAction()).queryForList(e.getQuery(), e.getData(), skipp, pagePreSize);
                notepadRender.trans(datas, new File(e.getFileName()));

                computeRemainingTime(finExportTask, pageSize, times, i, startTime);
                // for compatibility
                if (datas == null || datas.size() == 0) {
                    break;
                }
                datas.clear();
            }
            // for compressed
            BigDecimal size = CompressedFilesUtil.compressedFiles(new File(e.getFileName()));
            finExportTask.setFileSize(size);
            finExportTask.setFileName(finExportTask.getFileName() + CompressedFilesUtil.FILE_STYLE);
            return Boolean.TRUE;
        }

        private boolean breakTask(FinExportTask finExportTask) {
            FinExportTask finExportTasknew = finExportTaskBo.getFinExportTask(finExportTask.getId());
            if (!BizConstant.EXPORT_TASK_STATUS_SERVICE.equals(finExportTasknew.getStatus())) {
                return Boolean.TRUE;
            } else {
                return Boolean.FALSE;
            }
        }

        private void computeRemainingTime(FinExportTask finExportTask, int pageSize, Queue<Long> times, int i,
                                          long start) {
            long end = new Date().getTime();
            long time = (pageSize - i) * (end - start);
            if (times.size() > COMPUTE_TIME_TIMES) {
                times.remove();
            }
            times.add(time);
            long timeall = 0;
            for (Long lo : times) {
                timeall += lo;
            }
            time = timeall / times.size();
            finExportTask.setServiceEndDate(new Date(new Date().getTime() + time));
            finExportTaskBo.computeRemainingTime(finExportTask);
        }

        private void reviseMax(Export e) throws SQLException {
            int skip = e.getSkip();
            String action = e.getAction();
            String query = e.getQuery();
            int max = e.getMax();
            int maxTo = getDao(action).queryCount(query, e.getData());
            if (maxTo < max + skip) {
                max = maxTo - skip + 1;
                if (max < 0) {
                    max = 0;
                }
            }
            e.setMax(max);
        }

        private Queryable getDao(String dao) throws SQLException {

            Object ret = mFact.getBean(dao);
            if (ret == null) {
                throw new SQLException("Dao bean [" + dao + "] not found.");
            }
            if (!(ret instanceof Queryable)) {
                throw new SQLException("Dao bean [" + dao + "] must implements com.ali.shy.dao.Queryable.");
            }
            return (Queryable) ret;
        }

        private final class Export {

            public String getFileName() {
                return fileName;
            }

            public void setFileName(String fileName) {
                this.fileName = fileName;
            }

            public String getAction() {
                return action;
            }

            public void setAction(String action) {
                this.action = action;
            }

            public String getQuery() {
                return query;
            }

            public void setQuery(String query) {
                this.query = query;
            }

            public int getSkip() {
                return skip;
            }

            public void setSkip(int skip) {
                this.skip = skip;
            }

            public int getMax() {
                return max;
            }

            public void setMax(int max) {
                this.max = max;
            }

            private String fileName;
            private String action;
            private String query;
            private int    skip;
            private int    max;

            public Map<String, Object> getData() {
                return data;
            }

            public void setData(Map<String, Object> data) {
                this.data = data;
            }

            private Map<String, Object> data;
        }
    }
}

相關推薦

資料庫料量匯出執行版本原始碼部分

package com.alibaba.crm.finance.bo.export; import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.sql

資料庫料量匯出執行版本

【不積跬步,無以至千里;不積小流,無以成江海。】 一、概述 一年多前,我做了一個小需求,匯出80w的資料。當時寫了一篇部落格簡單地講了一些原理,並貼出了部分的原始碼。原理用了一張圖來表述: 基本就是客戶在頁面申請匯出請求,把請求存在資料庫中,再由定時任務取出來執行

Java料量執行)分段分批處理

分段處理主類 package pers.zuo.component.piecewise; import java.util.ArrayList; import java.util.List; import java.util.concurrent.C

【OS作業】用執行統計txt檔案中字元個數(Java實現)

問題描述 給定一個txt檔案,利用不同個數的執行緒查詢檔案中某字元的個數,探究執行緒個數與查詢時間的關係。 本作業程式碼使用JAVA實現,版本為10.0.2,使用的IDE為Eclipse4.9.0. 結果測試所用的txt檔案內容為英文,編碼格式為UTF-8。 原始碼 第一版程式碼:(

Excel料量匯出

問題 在進行匯出Excel的時候,由於資料量十分大,導致流不能很快的寫入到Excel檔案,使得流一直在記憶體中,導致記憶體佔用4個多G,嚴重影響服務狀態。 方案 找到問題點是因為流或者Excel的資料結構(如XSSFWorkbook等)在記憶體停留時間太長。所

SQL Server 使用bcp進行料量匯出匯入

轉載:http://www.cnblogs.com/gaizai/archive/2010/04/17/1714389.html   SQL Server的匯出匯入方式有: 在SQL Server中提供了匯入匯出的介面操作。 在介面操作中又分【複製一個或多個表或檢視的資料】和【編寫查

sql優化(查詢料量時sql執行時間過長)

問題:Oracle資料庫 sql查詢的優化(成交額統計表的sql查詢時間過長進行的優化) 解決辦法:對sql語句中使用檢視的部分替換為子查詢,對查詢表條件欄位建立索引 引發的問題:在什麼情況下建立索引,及建立索引後引發的開銷有哪些 經查詢oracle的索引機制,摘錄如下:

料量匯出的設計總結

背景 目前做的一個系統中有多個業務功能存在資料匯出的功能,而且以對賬明細匯出為例,一般一個區縣級稅務機關下的某個屬期的所有入庫明細資料可能達到100多萬, 由於資料量過大,且現有實現方式不合理,容易出現由於匯出資料過多造成的“記憶體溢位”問題,並且由於現有匯出是同步方式,

spring Batch實現資料庫料量讀寫

1. data-source-context.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" x

執行版本TCP聊天程式服務端

   這是一個通過多執行緒來實現可以接受多個客戶端的TCP聊天程式。    //這是一個實現多執行緒TCP的聊天程式服務端 #include<stdio.h> #include<stdlib.h> #includ

[.net 執行]ConcurrentBag原始碼分析

ConcurrentBag根據操作執行緒,對不同執行緒分配不同的佇列進行資料操作。這樣,每個佇列只有一個執行緒在操作,不會發生併發問題。其內部實現運用了net4.0新加入的ThreadLocal執行緒本地儲存功能。各個佇列間通過連結串列維護。 其內部結構如下:   1、獲取執行緒本地佇列:

Python-爬取妹子圖(單執行執行版本)

一、參考文章     Python爬蟲之——爬取妹子圖片     上述文章中的程式碼講述的非常清楚,我的基本能思路也是這樣,本篇文章中的程式碼僅僅做了一些異常處理和一些日誌顯示優化工作,寫此文章主要是當做筆記,方便以後查閱,修改的地方如下: 1、異常處理

Python-爬取校花網視訊(單執行執行版本)

一、參考文章     python爬蟲爬取校花網視訊,單執行緒爬取     爬蟲----爬取校花網視訊,包含多執行緒版本     上述兩篇文章都是對校花網視訊的爬取,由於時間相隔很久了,校花網上的一些視訊已經不存在了,因此上

Java執行——FutureTask原始碼解析

一個很常見的多執行緒案例是,我們安排主執行緒作為分配任務和彙總的一方,然後將計算工作切分為多個子任務,安排多個執行緒去計算,最後所有的計算結果由主執行緒進行彙總。比如,歸併排序,字元頻率的統計等等。 我們知道Runnable是不返回計算結果的,如果想利用多執行緒的話,只能儲

Java執行——ReentrantLock原始碼閱讀

上一章《AQS原始碼閱讀》講了AQS框架,這次講講它的應用類(注意不是子類實現,待會細講)。 ReentrantLock,顧名思義重入鎖,但什麼是重入,這個鎖到底是怎樣的,我們來看看類的註解說明 ReentrantLock與隱式鎖synchronized功能相同,但ReentrantLock更具有擴充套件性

Java執行——ReentrantReadWriteLock原始碼閱讀

之前講了《AQS原始碼閱讀》和《ReentrantLock原始碼閱讀》,本次將延續閱讀下ReentrantReadWriteLock,建議沒看過之前兩篇文章的,先大概瞭解下,有些內容會基於之前的基礎上閱讀。 這個並不是ReentrantLock簡單的升級,而是落地場景的優化,我們來詳細瞭解下吧。 背景 JUC

檔案上傳案例及執行版本

檔案上傳案例 程式碼演示:   1 public class TCPServer { 2 public static void main(String[] args) throws IOException { 3 ServerSocket s

Caffe:CPU模式下使用openblas-openmp(執行版本)

從所周知,所有的深度學習框架使用GPU執行是最快的,但是在不具備Nvidia顯示卡的環境下只使用CPU來執行,慢就慢點吧,對於學習階段還是夠用的。 Caffe用到的Blas可以選擇Altas,OpenBlas,Intel MKL,Blas承擔了大量了數學工作,

平衡樹法求字首和--內容講解及執行實現原始碼

平衡樹求和法是一個適用於並行處理的演算法,它的定義如下:下面具體給出在SIMD並行機上的視線演算法,如下:由於沒有SIMD並行機,我採用Java多執行緒的Concurrent包中的工具實現了概演算法。最後再說一下如何獲得原始碼:有需要的朋友可以去那裡下載全部原始碼和工程檔案。

JAVA執行之基礎部分

public void transfer(int from, int to, double amount){synchronized(lockObj){ // if (energyBoxes[from] < amount) // return; //while迴圈,保證條件不滿足時任務都會被條件阻擋 /