1. 程式人生 > >如何用java執行緒池做分批次查詢處理 java執行緒池ThreadPoolExecutor的使用

如何用java執行緒池做分批次查詢處理 java執行緒池ThreadPoolExecutor的使用

需求是在一個大資料量的表中按條件查詢出資料後做相應的業務。我是使用的java執行緒池ThreadPoolExecutor,實現分批次去查詢,查詢到資料後,又分多個執行緒去做業務。

執行緒池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,   long keepAliveTime,   TimeUnit unit,   BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

corePoolSize: 執行緒池維護執行緒的最少數量

maximumPoolSize:執行緒池維護執行緒的最大數量

keepAliveTime: 執行緒池維護執行緒所允許的空閒時間

unit: 執行緒池維護執行緒所允許的空閒時間的單位

workQueue: 執行緒池所使用的緩衝佇列

handler: 執行緒池對拒絕任務的處理策略

一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個 Runnable型別的物件,任務的執行方法就是Runnable型別物件的run()方法。

執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:
         a. 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
   b. 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列。
   c. 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立執行緒執行這個任務;
   d. 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常,告訴呼叫者“我不能再接受任務了”。

當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。
當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
   這樣的過程說明,並不是先加入任務就一定會先執行。假設佇列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那麼當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然後任務 4~13 被放入佇列。這時候佇列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會丟擲異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

下面來看具體的程式碼(程式碼中會有部分程式碼以+++表示不方便各位檢視的):

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());

        int pageSize = 1000; // 每次查詢交易條數
        int handleSize = 200; // 執行緒一次性處理的交易條數
        int handleCount = 0;
        int transCount = +++mapper.getTransCount(batchDate, +);//根據條件去查詢需要做業務的資料條數,查詢條數的sql語句快
        logger.info(MessageFormat.format("[+++日期:[{0}], 待+++記錄條數為:[{1}]", batchDate, transCount));//MessageFormat.format是日誌的一個方法,推薦大家這麼使用
        List<+++> tranList = null;
        while (handleCount < transCount)
        {
            tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
            if (tranList == null || tranList.size() == 0)
            {
                logger.info(MessageFormat.format(++++
                handleCount += pageSize;
                continue;
            }

            int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
            CountDownLatch latch = new CountDownLatch(splitCount);
            for (int i = 0; i < splitCount; i++)
            {
                int toIndex = (i + 1) * handleSize;
                if (i == splitCount - 1)
                {
                    toIndex = tranList.size();
                }
                List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
                threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到執行緒池,執行的方法是+++Thread類中的run方法
            }

            handleCount += pageSize;

            try
            {
                latch.await(5, TimeUnit.MINUTES);
            }
            catch (InterruptedException e)
            {
                logger.error(getClass().getName() + " doTask fail.", e);
            }
        }

下面是threadPool.execute(new +++Thread...中的thread類

public class +++Thread implements Runnable
{

    private Logger logger =

    private List<NpcTransaction> trans;

    private CountDownLatch latch;

    private String batchDate;

    private +++Manager

    public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
            CountDownLatch latch)
    {
        this.+++Manager = +++Manager;
        this.trans = trans;
        this.batchDate = batchDate;
        this.latch = latch;
    }

    /**
     * 過載方法
     */
    @Override
    public void run()
    {

        int saveCount = 0;
        try
        {
            saveCount = +++Manager.save+++Record(trans);//
        }
        catch (Exception e)
        {
            logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常:[{1}]", batchDate, e.getMessage()));
            e.printStackTrace();
        }
        if (saveCount != trans.size())
        {
            logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常,++成功條數:[{1}],預期條數:[{2}]", batchDate,
                    saveCount, trans.size()));
        }
        latch.countDown();

    }

    public List<++Transaction> getTrans()
    {
        return trans;
    }

    public void setTrans(List<++Transaction> trans)
    {
        this.trans = trans;
    }

    /**
     * 獲取 latch
     *
     * @return 返回 latch
     */
    public CountDownLatch getLatch()
    {

        return latch;
    }

    /**
     * 設定 latch
     *
     * @param 對latch進行賦值
     */
    public void setLatch(CountDownLatch latch)
    {

        this.latch = latch;
    }

    /**
     * 獲取 batchDate
     *
     * @return 返回 batchDate
     */
    public String getBatchDate()
    {

        return batchDate;
    }

    /**
     * 設定 batchDate
     *
     * @param 對batchDate進行賦值
     */
    public void setBatchDate(String batchDate)
    {

        this.batchDate = batchDate;
    }

}

都要寫get,set方法,latch.countDown();這個最好寫在finally中

關於CountDownLatch這個,我下面簡單的說一下:

CountDownLatch,一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。
主要方法
 public CountDownLatch(int count);
 public void countDown();
 public void await() throws InterruptedException
 構造方法引數指定了計數的次數
 countDown方法,當前執行緒呼叫此方法,則計數減一
 awaint方法,呼叫此方法會一直阻塞當前執行緒,直到計時器的值為0

此處用計數器,因為一組1000條資料通過5個執行緒去執行,這組執行完再進行第二組以及其後組的1000條資料操作。