1. 程式人生 > >多執行緒分派任務的MW元件初步設想?持續更新...

多執行緒分派任務的MW元件初步設想?持續更新...

public  class Master<E> {

    //存放E型別任務的容器,只對taskQuueu進行寫
    private volatile BlockingQueue<? super E> TaskQueue;
    //存放每一個worker任務結果的集合,只對result進行讀
    private volatile BlockingQueue<? super Object> resultQueue;

    //存放worker的合集
    private volatile Map<String,Callable> wokersMap;
    //執行任務的執行緒池
private ThreadPoolExecutor threadPool; //存放執行結果 //該Master執行完成次數 //TODO private AtomicInteger finishNum = new AtomicInteger(-1); //初始化capacity private static Integer DEFAULT_CAPACITY = 1200; //使用鎖 private volatile Lock lock = new ReentrantLock(); //任務的future佇列 private List<Future> futureList = new
ArrayList<>(); //整個master標誌 private AtomicBoolean flag = new AtomicBoolean(false); //日誌 private final static Logger logger = LoggerFactory.getLogger(Master.class); /** * 構造一個大小為30,任務容量為capacity的執行緒池,初始化workQueue和workerMap * @param worker * @param workThreads * @param capacity */
public Master(Worker worker, Integer workThreads, Integer capacity){ threadPool = new ThreadPoolExecutor( 10, 30, 1000L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(capacity), new MWRejection()); TaskQueue = new ArrayBlockingQueue<E>(capacity); wokersMap = new HashMap(); resultQueue = new LinkedBlockingQueue<Object> (); worker.setTaskQueue(this.TaskQueue); worker.setResultQueue(this.resultQueue); for (Integer i = 0; i < workThreads; i++) { wokersMap.put(Integer.toString(i),worker); } logger.info("{}個worker已經啟動",workThreads); } //構造 public Master(Worker worker, Integer workThreads){ this(worker,workThreads,DEFAULT_CAPACITY); } //執行任務 public void executeThread(){ wokersMap.forEach((k,v)->{ Future future = threadPool.submit(v); futureList.add(future); }); } //提交任務 public void submitTask(E task){ lock.lock(); try { this.TaskQueue.put(task); logger.info("task add {}",task.toString()); }catch (Exception e){ System.out.println("提交失敗"); logger.error("{}提交失敗",task); }finally { lock.unlock(); } } //過程中間判斷,非阻塞 public boolean isComplete(){ lock.lock(); try{ if(futureList != null) for (Future f : futureList) { if (!f.isDone()) { //任務尚未做完 return false; } } return true; }catch (Exception e){ return flag.get(); }finally { System.out.println("準備釋放lock"); lock.unlock(); } } //最終狀態判斷,阻塞 public Boolean get(){ lock.lock(); try { flag.set(true); for (Future f : futureList) { if(!(Boolean) f.get()){ flag.set(false); System.out.println("future failure"); } } } catch (Exception e) { flag.set(false); e.printStackTrace(); }finally { lock.unlock(); } threadPool.shutdownNow(); System.out.println("完成了"+flag.get()); return flag.get(); } //計算結果方法 /** * 將Worker型別傳入 * @param * @return */ public Integer getFinishedSum(){ lock.lock(); Integer sum = 0; try{ if (this.get()){ if( resultQueue != null/* && threadPool.isTerminated()*/){ this.finishNum.incrementAndGet(); for (Object v: resultQueue) { if(v != null){ // this.finishNum.incrementAndGet(); sum += (Integer) v; } } } } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return sum; } }

@ConcurrentSafe
public abstract class Worker<E> implements Callable {

    //只對taskQueue進行讀
    private  BlockingQueue<? extends E> taskQueue;
    //只對resultQueue進行寫
    private  BlockingQueue<? super Object> resultQueue;

    //使用鎖
    private volatile Lock lock = new ReentrantLock();
    //統計校驗
    private static AtomicLong TaskID = new AtomicLong(0L);


    public void setResultQueue(BlockingQueue<Object> resultQueue) {
        this.resultQueue = resultQueue;
    }

    public void setTaskQueue(BlockingQueue<E> taskQueue) {
        this.taskQueue = taskQueue;
    }


    private  E getOne(){
        lock.lock();
            try {
                if(taskQueue.size() != 0 && !taskQueue.isEmpty())
                    return this.taskQueue.poll();
                return null;
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }finally {
                lock.unlock();
            }
    }

    public Boolean call() {
        while(true){
            try {
                E task = getOne();
                if(task == null){
                    break;
                }
                Object result = handle(task);
                resultQueue.put(result);
                TaskID.incrementAndGet();
            }catch (Exception e){
                e.printStackTrace();
                return false;
            }
        }
        return true;
    }

    //處理任務的方法
    public abstract Object handle(E task);

    //獲取任務的型別
    public abstract  Class<E> getTaskClass();

    public static long getTaskID(){
        return TaskID.get();
    }

}