如何用java執行緒池做分批次查詢處理 java執行緒池ThreadPoolExecutor的使用
需求是在一個大資料量的表中按條件查詢出資料後做相應的業務。我是使用的java執行緒池ThreadPoolExecutor,實現分批次去查詢,查詢到資料後,又分多個執行緒去做業務。
執行緒池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize: 執行緒池維護執行緒的最少數量
maximumPoolSize:執行緒池維護執行緒的最大數量
keepAliveTime: 執行緒池維護執行緒所允許的空閒時間
unit: 執行緒池維護執行緒所允許的空閒時間的單位
workQueue: 執行緒池所使用的緩衝佇列
handler: 執行緒池對拒絕任務的處理策略
一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個 Runnable型別的物件,任務的執行方法就是Runnable型別物件的run()方法。
執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:
a. 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
b. 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列。
c. 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立執行緒執行這個任務;
d. 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常,告訴呼叫者“我不能再接受任務了”。
當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。
當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
這樣的過程說明,並不是先加入任務就一定會先執行。假設佇列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那麼當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然後任務 4~13 被放入佇列。這時候佇列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會丟擲異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
下面來看具體的程式碼(程式碼中會有部分程式碼以+++表示不方便各位檢視的):
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
int pageSize = 1000; // 每次查詢交易條數
int handleSize = 200; // 執行緒一次性處理的交易條數
int handleCount = 0;
int transCount = +++mapper.getTransCount(batchDate, +);//根據條件去查詢需要做業務的資料條數,查詢條數的sql語句快
logger.info(MessageFormat.format("[+++日期:[{0}], 待+++記錄條數為:[{1}]", batchDate, transCount));//MessageFormat.format是日誌的一個方法,推薦大家這麼使用
List<+++> tranList = null;
while (handleCount < transCount)
{
tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
if (tranList == null || tranList.size() == 0)
{
logger.info(MessageFormat.format(++++
handleCount += pageSize;
continue;
}
int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
CountDownLatch latch = new CountDownLatch(splitCount);
for (int i = 0; i < splitCount; i++)
{
int toIndex = (i + 1) * handleSize;
if (i == splitCount - 1)
{
toIndex = tranList.size();
}
List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到執行緒池,執行的方法是+++Thread類中的run方法
}
handleCount += pageSize;
try
{
latch.await(5, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
logger.error(getClass().getName() + " doTask fail.", e);
}
}
下面是threadPool.execute(new +++Thread...中的thread類
public class +++Thread implements Runnable
{
private Logger logger =
private List<NpcTransaction> trans;
private CountDownLatch latch;
private String batchDate;
private +++Manager
public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
CountDownLatch latch)
{
this.+++Manager = +++Manager;
this.trans = trans;
this.batchDate = batchDate;
this.latch = latch;
}
/**
* 過載方法
*/
@Override
public void run()
{
int saveCount = 0;
try
{
saveCount = +++Manager.save+++Record(trans);//
}
catch (Exception e)
{
logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常:[{1}]", batchDate, e.getMessage()));
e.printStackTrace();
}
if (saveCount != trans.size())
{
logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常,++成功條數:[{1}],預期條數:[{2}]", batchDate,
saveCount, trans.size()));
}
latch.countDown();
}
public List<++Transaction> getTrans()
{
return trans;
}
public void setTrans(List<++Transaction> trans)
{
this.trans = trans;
}
/**
* 獲取 latch
*
* @return 返回 latch
*/
public CountDownLatch getLatch()
{
return latch;
}
/**
* 設定 latch
*
* @param 對latch進行賦值
*/
public void setLatch(CountDownLatch latch)
{
this.latch = latch;
}
/**
* 獲取 batchDate
*
* @return 返回 batchDate
*/
public String getBatchDate()
{
return batchDate;
}
/**
* 設定 batchDate
*
* @param 對batchDate進行賦值
*/
public void setBatchDate(String batchDate)
{
this.batchDate = batchDate;
}
}
都要寫get,set方法,latch.countDown();這個最好寫在finally中
關於CountDownLatch這個,我下面簡單的說一下:
CountDownLatch,一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。
主要方法
public CountDownLatch(int count);
public void countDown();
public void await() throws InterruptedException
構造方法引數指定了計數的次數
countDown方法,當前執行緒呼叫此方法,則計數減一
awaint方法,呼叫此方法會一直阻塞當前執行緒,直到計時器的值為0
此處用計數器,因為一組1000條資料通過5個執行緒去執行,這組執行完再進行第二組以及其後組的1000條資料操作。