java多執行緒:ExecutorService多執行緒例項(六)
阿新 • • 發佈:2019-01-30
瞭解了ExecutorService,現在就來看下具體業務的具體應用。
解決大量資料同時插入資料庫的多執行緒實現,解決其效能問題:
1、執行緒池
package com.akk.thread; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * * Description: 執行緒池 */ public class ThreadUtil { private ExecutorService pool; private int max; private final static int DEFAULT_TIMEOUT = 520; public ThreadUtil(int max) { this.max = max; pool = Executors.newFixedThreadPool(max); } /** * * <p>Descrption: 執行單執行緒</p> * @return void * @param cmd */ public void execute(Runnable cmd) throws Exception { pool.execute(cmd); } /** * * <p>Descrption: 執行批量無返回</p> * @return void * @param cmd */ public void executeNotWait(Collection<? extends Callable<?>> tasks) throws Exception { //pool.invokeAll(tasks); } /** * * <p>Descrption: 批量執行返回</p> * @return List<T> * @param tasks * @return * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ public <T> List<T> execute(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException, TimeoutException { return execute(tasks, DEFAULT_TIMEOUT); } /** * 設定超時時間 * @param tasks * @param timeout * @return * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ public <T> List<T> execute(Collection<? extends Callable<T>> tasks, int timeout) throws InterruptedException, ExecutionException, TimeoutException { List<T> results = new ArrayList<T>(tasks.size()); List<Future<T>> futures = pool.invokeAll(tasks); for (Future<T> future : futures) { results.add(future.get(timeout, TimeUnit.SECONDS)); } return results; } public int getMax() { return max; } }
2、批量列表分批迭代器
package com.akk.thread; import java.util.Iterator; import java.util.List; /** * 批量列表分批迭代器 */ public class BatchIterator<T> implements Iterator<List<T>> { private int batchSize; private List<T> list; private int start; private int end; private int pageCount; /** * 新建分批迭代器 * * @param list 要分批的原始列 * @param size 要分批的每批最大數量 */ public BatchIterator(List<T> list, int size){ this.batchSize = size; this.list = list; this.start = 0; this.end += this.batchSize; if (this.end > list.size()) { this.end = list.size(); } this.pageCount = 0; } @Override public boolean hasNext() { return start < list.size(); } @Override public List<T> next() { List<T> resultList = list.subList(start, end); start = end; end += batchSize; if (end > list.size()) { end = list.size(); } ++ pageCount; return resultList; } /** * Gets start. * * @return the start */ public int getStart() { return start; } /** * Gets end. * * @return the end */ public int getEnd() { return end; } /** * Gets batch size. * * @return the batch size */ public int getBatchSize() { return batchSize; } /** * Gets page count. * * @return the page count */ public int getPageCount() { return pageCount; } @Override public void remove() { // TODO Auto-generated method stub } }
3、資料庫批量操作時的分批工具
package com.akk.thread; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * 資料庫批量操作時的分批工具 */ public class BatchUtil { /** * 獲取分批的List * @param list 原始待分批的列 * @param size 每批最大數量 * @return the batch list */ public static <T> List<List<T>> getBatchList(List<T> list, int size) { List<List<T>> resultList = new ArrayList<List<T>>(); int resultSize = 0; int tmp = 0; while (resultSize < list.size()) { tmp += size; if (tmp > list.size()) { tmp = list.size(); } resultList.add(list.subList(resultSize, tmp)); resultSize = tmp; } return resultList; } /** * 分批批量執行函式式 * 不對返回值進行處理 * 例如: * <pre>{@code * BatchUtil.batchVoidListExecute( * 3, * list -> System.out.print(list.size() + " "), * Lists.newArrayList(1,2,3,4,5,6,7)); * }</pre> * 結果是3 3 1 * @param size 每批最大數量 * @param voidListExt 可以批量執行的函式式,引數是T的List * @param list 原始列 * @param <T> 批量的單位實體型別 */ public static <T> void batchVoidListExecute(int size, VoidListExt<T> voidListExt, List<T> list) { if (list.isEmpty()) { return; } Iterator<List<T>> iterator = new BatchIterator<>(list, size); while (iterator.hasNext()) { voidListExt.execute(iterator.next()); } } /** * 與batchVoidListExecute相同 * 可以用返回值為List的函式式,執行後將返回List拼接為返回值 * @param size * @param listListExt * @param list * @param <T> * @param <S> * @return */ public static <T, S> List<S> batchListListExecute(int size, ListListExt<T, S> listListExt, List<T> list) { Iterator<List<T>> iterator = new BatchIterator<>(list, size); List<S> resultList = new ArrayList<>(); while (iterator.hasNext()) { resultList.addAll(listListExt.execute(iterator.next())); } return resultList; } /** * 與batchVoidListExecute相同 * 可以用返回值為List的函式式,執行後將返回List拼接為返回值 * @param size * @param intListExt * @param list * @param <T> * @return */ public static <T> int batchIntListExecute(int size, IntListExt<T> intListExt, List<T> list) { Iterator<List<T>> iterator = new BatchIterator<>(list, size); int result = 0; while (iterator.hasNext()){ result += intListExt.execute(iterator.next()); } return result; } /** * <p>Description: * 與batchVoidListExecute相同 * 可以自定義返回值型別,然後通過自定義收集函式式來收集結果 * </p> * @param <T> the type parameter * @param <S> the type parameter * @param <R> the type parameter * @param size the size * @param commonListExt the common list ext * @param resultCollector the result collector * @param list the list * @param result the result 用collector將結果收集到其中 * @return the r */ public static <T,S,R> R batchCommonListExecute(int size, CommonListExt<T, S> commonListExt, ResultCollector<S,R> resultCollector, List<T> list, R result) { Iterator<List<T>> iterator = new BatchIterator<>(list, size); while (iterator.hasNext()) { S s = commonListExt.execute(iterator.next()); resultCollector.collect(s, result); } return result; } /** * 引數是List返回值為空的函式式 * @param <T> */ // @FunctionalInterface public interface VoidListExt<T> { void execute(List<T> list); } /** * 引數是List返回值也是List的函式式 * @param <T> * @param <S> */ // @FunctionalInterface public interface ListListExt<T, S> { List<S> execute(List<T> list); } /** * 引數是List返回值是int的函式式 * @param <T> */ // @FunctionalInterface public interface IntListExt<T> { int execute(List<T> list); } /** * 引數是List自定義返回值的函式式 * @param <T> * @param <S> */ // @FunctionalInterface public interface CommonListExt<T, S> { S execute(List<T> list); } /** * 能將第一個引數的結果收集到第二個引數中的函式式 * @param <SRC> * @param <COLLECT_TARGET> */ // @FunctionalInterface public interface ResultCollector<SRC, COLLECT_TARGET> { void collect(SRC src, COLLECT_TARGET collectTarget); } }
4、執行緒執行的具體任務
package com.akk.thread;
import java.util.List;
import java.util.concurrent.Callable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.akk.mvc.entity.UserTestEntity;
import com.akk.mvc.service.UserService;
/**
* 執行緒執行的具體任務
* 批量儲存使用者資訊
*/
@Component
public class BatchListTask implements Callable<List<UserTestEntity>> {
private List<UserTestEntity> batchDebitList = null;
@Autowired
private UserService userService;
public List<UserTestEntity> getBatchDebitList() {
return batchDebitList;
}
public void setBatchDebitList(List<UserTestEntity> batchDebitList) {
this.batchDebitList = batchDebitList;
}
public UserService getUserService() {
return userService;
}
public void setUserService(UserService userService) {
this.userService = userService;
}
@Override
public List<UserTestEntity> call() throws Exception {
return userService.batchInsert(batchDebitList);
}
}
5、Junit入口
package com.akk.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.akk.mvc.entity.UserTestEntity;
import com.akk.mvc.service.UserService;
import com.akk.thread.BatchListTask;
import com.akk.thread.BatchUtil;
import com.akk.thread.ThreadUtil;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:conf/applicationContext.xml")
public class ThreadTest {
@Autowired
private UserService userService;
@Autowired
private ThreadUtil pool;
@Test
public void thread() {
List<BatchListTask> tasks = new ArrayList<BatchListTask>();
List<UserTestEntity> userTestEntityList = new ArrayList<UserTestEntity>();
for (int i = 0; i < 20; i++) {
UserTestEntity userTestEntity = new UserTestEntity();
userTestEntity.setUserId(i);
userTestEntityList.add(userTestEntity);
}
List<List<UserTestEntity>> updateBatchList = BatchUtil.getBatchList(
userTestEntityList, 5);
for (List<UserTestEntity> list : updateBatchList) {
BatchListTask batchListTask = new BatchListTask();
batchListTask.setBatchDebitList(list);
batchListTask.setUserService(userService);
tasks.add(batchListTask);
}
try {
List<List<UserTestEntity>> result = pool.execute(tasks);
for(List<UserTestEntity> thisList : result){
for(UserTestEntity thisEntity : thisList){
thisEntity.getId();
System.out.println("id:"+thisEntity.getId());
}
}
System.out.println(result.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}