1. 程式人生 > >Java大資料量(多執行緒)分段分批處理

Java大資料量(多執行緒)分段分批處理

分段處理主類

package pers.zuo.component.piecewise;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import pers.zuo.component.piecewise.bean.PiecewiseKey;
import
pers.zuo.component.piecewise.bean.PiecewiseResult; import pers.zuo.component.piecewise.bean.PiecewiseTask; /** * @author zuojingang * * @param <T> * the type of part process return */ public abstract class PiecewiseHandler<V> { public void nThreads( final
Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult, final int totalNum) throws Exception { nThreads(nThreadResult, totalNum, D_THREAD_SIZE, D_PART_SIZE); } /** * @param totalNum * @param threadSize * @return
nThreads process result. */
public void nThreads( final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult, final int totalNum, final int threadSize, final int partSize) throws Exception { if (null == nThreadResult || 0 >= totalNum || 0 >= threadSize) { return; } ExecutorService fixThreadPool = Executors.newFixedThreadPool(D_N_THREAD); List<PiecewiseTask> fTaskList = new ArrayList<>(); int fromIndex = 0; try { while (totalNum > fromIndex) { final int thisFromIndex = fromIndex; final int threadProcessNum = Math.min(totalNum - fromIndex, threadSize); final int thisToIndex = thisFromIndex + threadProcessNum; if (0 < threadProcessNum) { PiecewiseTask futureTask = PiecewiseBuilder.buildTask(new Callable<Boolean>() { @Override public Boolean call() throws Exception { final Map<PiecewiseKey, PiecewiseResult<V>> threadResult = PiecewiseBuilder .initializeThreadResult(); nThreadResult.put(PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex), PiecewiseBuilder.buildResult(threadResult)); singleThread(threadResult, thisFromIndex, threadProcessNum, partSize); return true; } }, PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex)); fixThreadPool.submit(futureTask); fTaskList.add(futureTask); } fromIndex += threadProcessNum; } boolean finished = true; for (PiecewiseTask futureTask : fTaskList) { try { finished = finished && futureTask.get(); } catch (InterruptedException | ExecutionException e) { nThreadResult.get(futureTask.getTaskKey()).setException(e); } } } catch (Exception e) { throw e; } finally { // the threadPool must manual-lock after use fixThreadPool.shutdown(); } } public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int totalNum) { singleThread(threadResult, 0, totalNum); } public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset, final int totalNum) { singleThread(threadResult, offset, totalNum, D_PART_SIZE); } /** * @param offset * @param toIndex * @param partSize * @return process subList values and include first index(offset) and exclude * latest index(offset + totalNum) */ public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset, final int totalNum, final int partSize) { if (0 >= totalNum || 0 >= partSize) { return; } final int toIndex = offset + totalNum; int fromIndex = offset; while (toIndex > fromIndex) { int thisToIndex = Math.min(fromIndex + partSize, toIndex); V partResult = null; Exception pe = null; try { partResult = partProcess(fromIndex, thisToIndex); } catch (Exception e) { pe = e; } threadResult.put(PiecewiseBuilder.buildKey(fromIndex, thisToIndex), PiecewiseBuilder.buildResult(partResult, pe)); fromIndex = thisToIndex; } } /** * @param offset * @param partSize * @return part process result */ protected abstract V partProcess(final int fromIndex, final int toIndex) throws Exception; public static final int D_N_THREAD = 10; public static final int D_THREAD_SIZE = 10000; public static final int D_PART_SIZE = 1000; }

分段任務定製類

package pers.zuo.component.piecewise.bean;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/**
 * @author zuojingang
 *
 * @param <K
 *            extends Number> the type of part process return
 */
public class PiecewiseTask extends FutureTask<Boolean> {

    private final PiecewiseKey taskKey;

    public PiecewiseTask(Callable<Boolean> callable, PiecewiseKey taskKey) {
        super(callable);
        this.taskKey = taskKey;
    }

    public PiecewiseKey getTaskKey() {
        return taskKey;
    }

}

分段任務Key值類

package pers.zuo.component.piecewise.bean;

public class PiecewiseKey {

    private final Integer from;
    private final Integer to;

    public PiecewiseKey(Integer from, Integer to) {
        super();
        this.from = from;
        this.to = to;
    }

    public Integer getFrom() {
        return from;
    }

    public Integer getTo() {
        return to;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((from == null) ? 0 : from.hashCode());
        result = prime * result + ((to == null) ? 0 : to.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PiecewiseKey other = (PiecewiseKey) obj;
        if (from == null) {
            if (other.from != null)
                return false;
        } else if (!from.equals(other.from))
            return false;
        if (to == null) {
            if (other.to != null)
                return false;
        } else if (!to.equals(other.to))
            return false;
        return true;
    }
}

分段任務返回值類

package pers.zuo.component.piecewise.bean;

public class PiecewiseResult<V> {

    private final V val;
    private Exception exception;

    public PiecewiseResult(V val) {
        super();
        this.val = val;
    }

    public PiecewiseResult(V val, Exception exception) {
        super();
        this.val = val;
        this.exception = exception;
    }

    public Exception getException() {
        return exception;
    }

    public void setException(Exception exception) {
        this.exception = exception;
    }

    public V getVal() {
        return val;
    }
}

獲取例項工具類

package pers.zuo.component.piecewise.manager;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

import pers.zuo.component.piecewise.bean.PiecewiseKey;
import pers.zuo.component.piecewise.bean.PiecewiseResult;
import pers.zuo.component.piecewise.bean.PiecewiseTask;

public class PiecewiseBuilder {

    public static PiecewiseKey buildKey(Integer from, Integer to) {
        return new PiecewiseKey(from, to);
    }

    public static <V> PiecewiseResult<V> buildResult(V val) {
        return new PiecewiseResult<V>(val);
    }

    public static <V> PiecewiseResult<V> buildResult(V val, Exception exception) {
        return new PiecewiseResult<V>(val, exception);
    }

    public static PiecewiseTask buildTask(Callable<Boolean> callable, PiecewiseKey taskKey) {
        return new PiecewiseTask(callable, taskKey);
    }

    /**
     * this method aimed for simple when define the nThreadResult
     * 
     * @return
     */
    public static <V> Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> initializeNThreadResult() {
        return new HashMap<>();
    }

    /**
     * this method aimed for simple when define the threadResult
     * 
     * @return
     */
    public static <V> Map<PiecewiseKey, PiecewiseResult<V>> initializeThreadResult() {
        return new HashMap<>();
    }
}