1. 程式人生 > >java多執行緒:ExecutorService多執行緒例項(六)

java多執行緒:ExecutorService多執行緒例項(六)

瞭解了ExecutorService,現在就來看下具體業務的具體應用。

解決大量資料同時插入資料庫的多執行緒實現,解決其效能問題:

1、執行緒池

package com.akk.thread;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


/**
 * 
 * Description: 執行緒池
 */
public class ThreadUtil {
    
    private ExecutorService pool;
    private int max;
    
    private final static int DEFAULT_TIMEOUT = 520;
    
    public ThreadUtil(int max) {
        this.max = max;
        pool = Executors.newFixedThreadPool(max);
    }
    
    /**
     * 
     * <p>Descrption: 執行單執行緒</p>
     * @return void
     * @param cmd
     */
    public void execute(Runnable cmd) throws Exception {
        pool.execute(cmd);
    }
    
    /**
     * 
     * <p>Descrption: 執行批量無返回</p>
     * @return void
     * @param cmd
     */
    public void executeNotWait(Collection<? extends Callable<?>> tasks) throws Exception {
        //pool.invokeAll(tasks);
    }
    
    /**
     * 
     * <p>Descrption: 批量執行返回</p>
     * @return List<T>
     * @param tasks
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException 
     */
    public <T> List<T> execute(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException, TimeoutException {
        return execute(tasks, DEFAULT_TIMEOUT);
    }
    
    /**
     * 設定超時時間
     * @param tasks
     * @param timeout
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public <T> List<T> execute(Collection<? extends Callable<T>> tasks, int timeout) throws InterruptedException, ExecutionException, TimeoutException {
        List<T> results = new ArrayList<T>(tasks.size());
        List<Future<T>> futures = pool.invokeAll(tasks);
        for (Future<T> future : futures) {
            results.add(future.get(timeout, TimeUnit.SECONDS));
        }
        return results;
    }

    public int getMax() {
        return max;
    }
    
}


2、批量列表分批迭代器

package com.akk.thread;

import java.util.Iterator;
import java.util.List;


/**
 * 批量列表分批迭代器
 */
public class BatchIterator<T> implements Iterator<List<T>> {
    private int batchSize;
    private List<T> list;
    private int start;
    private int end;
    private int pageCount;

    /**
     * 新建分批迭代器
     *
     * @param list 要分批的原始列
     * @param size 要分批的每批最大數量
     */
    public BatchIterator(List<T> list, int size){
        this.batchSize = size;
        this.list = list;
        this.start = 0;
        this.end += this.batchSize;
        if (this.end > list.size()) {
            this.end = list.size();
        }
        this.pageCount = 0;
    }

    @Override
    public boolean hasNext() {
        return start < list.size();
    }

    @Override
    public List<T> next() {
        List<T> resultList = list.subList(start, end);
        start = end;
        end += batchSize;
        if (end > list.size()) {
            end = list.size();
        }
        ++ pageCount;
        return resultList;
    }

    /**
     * Gets start.
     *
     * @return the start
     */
    public int getStart() {
        return start;
    }

    /**
     * Gets end.
     *
     * @return the end
     */
    public int getEnd() {
        return end;
    }

    /**
     * Gets batch size.
     *
     * @return the batch size
     */
    public int getBatchSize() {
        return batchSize;
    }

    /**
     * Gets page count.
     *
     * @return the page count
     */
    public int getPageCount() {
        return pageCount;
    }

	@Override
	public void remove() {
		// TODO Auto-generated method stub
		
	}
}


3、資料庫批量操作時的分批工具

package com.akk.thread;


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * 資料庫批量操作時的分批工具
 */
public class BatchUtil {

    /**
     * 獲取分批的List
     * @param list 原始待分批的列
     * @param size 每批最大數量
     * @return the batch list
     */
    public static <T> List<List<T>> getBatchList(List<T> list, int size) {
        List<List<T>> resultList = new ArrayList<List<T>>();
        int resultSize = 0;
        int tmp = 0;
        while (resultSize < list.size()) {
            tmp += size;
            if (tmp > list.size()) {
                tmp = list.size();
            }
            resultList.add(list.subList(resultSize, tmp));
            resultSize = tmp;
        }
        return resultList;
    }

    /**
     * 分批批量執行函式式
     * 不對返回值進行處理
     * 例如:
     * <pre>{@code
     *          BatchUtil.batchVoidListExecute(
     *                  3,
     *                  list -> System.out.print(list.size() + " "),
     *                  Lists.newArrayList(1,2,3,4,5,6,7));
     * }</pre>
     * 結果是3 3 1
     * @param size 每批最大數量
     * @param voidListExt 可以批量執行的函式式,引數是T的List
     * @param list 原始列
     * @param <T> 批量的單位實體型別
     */
    public static <T> void batchVoidListExecute(int size, VoidListExt<T> voidListExt, List<T> list) {
        if (list.isEmpty()) {
            return;
        }
        Iterator<List<T>> iterator = new BatchIterator<>(list, size);
        while (iterator.hasNext()) {
            voidListExt.execute(iterator.next());
        }
    }


    /**
     * 與batchVoidListExecute相同
     * 可以用返回值為List的函式式,執行後將返回List拼接為返回值
     * @param size
     * @param listListExt
     * @param list
     * @param <T>
     * @param <S>
     * @return
     */
    public static <T, S> List<S> batchListListExecute(int size,
                                                      ListListExt<T, S> listListExt,
                                                      List<T> list) {
        Iterator<List<T>> iterator = new BatchIterator<>(list, size);
        List<S> resultList = new ArrayList<>();
        while (iterator.hasNext()) {
            resultList.addAll(listListExt.execute(iterator.next()));
        }
        return resultList;
    }

    /**
     * 與batchVoidListExecute相同
     * 可以用返回值為List的函式式,執行後將返回List拼接為返回值
     * @param size
     * @param intListExt
     * @param list
     * @param <T>
     * @return
     */
    public static <T> int batchIntListExecute(int size, IntListExt<T> intListExt, List<T> list) {
        Iterator<List<T>> iterator = new BatchIterator<>(list, size);
        int result = 0;
        while (iterator.hasNext()){
            result += intListExt.execute(iterator.next());
        }
        return result;
    }

    /**
     * <p>Description:
     * 與batchVoidListExecute相同
     * 可以自定義返回值型別,然後通過自定義收集函式式來收集結果
     * </p>
     * @param <T>  the type parameter
     * @param <S>  the type parameter
     * @param <R>  the type parameter
     * @param size the size
     * @param commonListExt the common list ext
     * @param resultCollector the result collector
     * @param list the list
     * @param result the result 用collector將結果收集到其中
     * @return the r
     */
    public static <T,S,R> R batchCommonListExecute(int size,
                                                   CommonListExt<T, S> commonListExt,
                                                   ResultCollector<S,R> resultCollector,
                                                   List<T> list,
                                                   R result) {
        Iterator<List<T>> iterator = new BatchIterator<>(list, size);
        while (iterator.hasNext()) {
            S s = commonListExt.execute(iterator.next());
            resultCollector.collect(s, result);
        }
        return result;
    }

    /**
     * 引數是List返回值為空的函式式
     * @param <T>
     */
//    @FunctionalInterface
    public interface VoidListExt<T> {
        void execute(List<T> list);
    }

    /**
     * 引數是List返回值也是List的函式式
     * @param <T>
     * @param <S>
     */
//    @FunctionalInterface
    public interface ListListExt<T, S> {
        List<S> execute(List<T> list);
    }

    /**
     * 引數是List返回值是int的函式式
     * @param <T>
     */
//    @FunctionalInterface
    public interface IntListExt<T> {
        int execute(List<T> list);
    }

    /**
     * 引數是List自定義返回值的函式式
     * @param <T>
     * @param <S>
     */
//    @FunctionalInterface
    public interface CommonListExt<T, S> {
        S execute(List<T> list);
    }

    /**
     * 能將第一個引數的結果收集到第二個引數中的函式式
     * @param <SRC>
     * @param <COLLECT_TARGET>
     */
//    @FunctionalInterface
    public interface ResultCollector<SRC, COLLECT_TARGET> {
        void collect(SRC src, COLLECT_TARGET collectTarget);
    }
}


4、執行緒執行的具體任務

package com.akk.thread;

import java.util.List;
import java.util.concurrent.Callable;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.akk.mvc.entity.UserTestEntity;
import com.akk.mvc.service.UserService;

/**
 * 執行緒執行的具體任務
 * 批量儲存使用者資訊
 */
@Component
public class BatchListTask implements Callable<List<UserTestEntity>> {
	
	private List<UserTestEntity> batchDebitList = null;
	
	@Autowired
	private UserService userService;
	
	public List<UserTestEntity> getBatchDebitList() {
		return batchDebitList;
	}

	public void setBatchDebitList(List<UserTestEntity> batchDebitList) {
		this.batchDebitList = batchDebitList;
	}

	public UserService getUserService() {
		return userService;
	}

	public void setUserService(UserService userService) {
		this.userService = userService;
	}

	@Override
	public List<UserTestEntity> call() throws Exception {
		return userService.batchInsert(batchDebitList);
	}

}


5、Junit入口

package com.akk.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.akk.mvc.entity.UserTestEntity;
import com.akk.mvc.service.UserService;
import com.akk.thread.BatchListTask;
import com.akk.thread.BatchUtil;
import com.akk.thread.ThreadUtil;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:conf/applicationContext.xml")
public class ThreadTest {

	@Autowired
	private UserService userService;
    @Autowired
    private ThreadUtil pool;
    
	@Test
	public void thread() {
		List<BatchListTask> tasks = new ArrayList<BatchListTask>();
		List<UserTestEntity> userTestEntityList = new ArrayList<UserTestEntity>();
		for (int i = 0; i < 20; i++) {
			UserTestEntity userTestEntity = new UserTestEntity();
			userTestEntity.setUserId(i);
			userTestEntityList.add(userTestEntity);
		}

		List<List<UserTestEntity>> updateBatchList = BatchUtil.getBatchList(
				userTestEntityList, 5);

		for (List<UserTestEntity> list : updateBatchList) {
			BatchListTask batchListTask = new BatchListTask();
			batchListTask.setBatchDebitList(list);
			batchListTask.setUserService(userService);
			tasks.add(batchListTask);
		}

		try {
			List<List<UserTestEntity>> result = pool.execute(tasks);
			for(List<UserTestEntity> thisList : result){
				for(UserTestEntity thisEntity : thisList){
					thisEntity.getId();
					System.out.println("id:"+thisEntity.getId());
				}
			}
			System.out.println(result.size());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
}