1. 程式人生 > >java 多執行緒管理執行緒任務,根據優先順序執行 任務超時管理 執行緒同步執行管理

java 多執行緒管理執行緒任務,根據優先順序執行 任務超時管理 執行緒同步執行管理

需求

需要根據優先順序執行任務,有任務不是特別重要,可以稍後執行;需要對正在執行的執行緒做超時監控;有的API依賴任務返回結果,執行緒池執行的時候任務也支援同步任務;

簡單測試

建立一個使用支援優先順序佇列(new PriorityBlockingQueue<>() )的執行緒,然後每個任務實現 java.lang.Comparable 介面
new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD * 2, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue());

簡單測試一下基本可行,但是發現使用優先順序列隊執行緒池發現使用FutureTask 會拋異常 java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 所以這個方法不能再用了

Future<?> future = executor.submit(new Task());

異常資訊

設計思路

  1. 定義一個任務介面,繼承 Runnable, Comparable,並且獲取超時時間方法、優先順序方法、執行時間方法、超時中止任務方法;
  2. 有一個管理執行緒池的類,用於執行任務
  3. 有一個超時檢測的類
  4. 有一個基本執行類,用於每次任務執行時把任務加入監控列表,有一個同步任務類,管理執行同步任務

程式碼實現

任務介面

/**
 * Created by zengrenyuan on 18/6/11.
 */
public interface ExecuteTask
extends Runnable, Comparable {
/** * @return 任務超時時間 */ long getTimeout(); /** * @return 任務的優先順序 */ int getPriority(); /** * 任務超時時呼叫 * 用於結束任務執行 */ void destroy(); /** * @return 任務當前的執行時間 */ long elapsed(); }

執行緒池管理

import
org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; import java.util.concurrent.*; /** * Created by zengrenyuan on 18/06/11. */ public class ExecuteThreadManager { private static final Logger LOG = LoggerFactory.getLogger(ExecuteThreadManager.class); /** * 執行緒池執行緒名稱 */ public static final String SCRIPT_TASK_THREAD_POOL = "TaskThreadPool"; /** * 最大執行緒數 */ private final int MAX_THREAD = 10; /** * 一個按優先順序執行的執行緒池 */ private final ThreadPoolExecutor executor = ThreadPoolUtils.newThreadPool(MAX_THREAD, SCRIPT_TASK_THREAD_POOL, new PriorityBlockingQueue<>()); private static class LazyHolder { private static final ExecuteThreadManager INSTANCE = new ExecuteThreadManager(); } private ExecuteThreadManager() { } /** * 單例 */ public static final ExecuteThreadManager getInstance() { return LazyHolder.INSTANCE; } /** * 把任務提交到執行緒池執行 */ public void execute(ExecuteTask task) { executor.execute(new BaseExecuteTask(task)); } /** * 同步執行任務,等任務執行完才會往下走 */ public void syncExecute(ExecuteTask task) { SynchronizedExecuteTask syncTask = new SynchronizedExecuteTask(task); executor.execute(syncTask); syncTask.await(); } @PreDestroy private void destroy() { try { executor.shutdownNow(); } catch (Exception e) { LOG.error(null, e); } } }

執行緒池生成工具類

import java.util.concurrent.*;

/**
 * Created by zengrenyuan on 18/06/11.
 */
public class ThreadPoolUtils {

    /**
     * 生成一個執行緒池
     *
     * @param maxThread      執行緒池最大數
     * @param threadPoolName 執行緒池名稱
     * @return
     */
    public static ThreadPoolExecutor newThreadPool(int maxThread, final String threadPoolName) {
        return newThreadPool(maxThread, threadPoolName, new LinkedBlockingDeque<>());
    }

    /**
     * 生成一個執行緒池
     *
     * @param maxThread      執行緒池最大數
     * @param threadPoolName 執行緒池名稱
     * @param workQueue      佇列
     * @return 執行緒池
     */
    public static ThreadPoolExecutor newThreadPool(int maxThread, final String threadPoolName, BlockingQueue workQueue) {
        return new ThreadPoolExecutor(
                maxThread,
                maxThread,
                0L,
                TimeUnit.MILLISECONDS,
                workQueue,
                getThreadFactory(threadPoolName)
        );
    }

    /**
     * @param threadPoolName 執行緒名稱
     * @return 執行緒生成器
     */
    public static ThreadFactory getThreadFactory(final String threadPoolName) {
        return new ThreadFactory() {

            private int threadNumber = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadPoolName + "-" + threadNumber);
            }
        };
    }
}

基礎任務類


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 簡單包裝一下
 * 在任務執行前把任務加入到執行任務監控池
 * Created by zengrenyuan on 18/06/11.
 */
public class BaseExecuteTask implements ExecuteTask {

    private static final Logger LOG = LoggerFactory.getLogger(BaseExecuteTask.class);
    protected ExecuteTask task;

    public BaseExecuteTask(ExecuteTask task) {
        this.task = task;
    }

    @Override
    public long getTimeout() {
        return task.getTimeout();
    }

    @Override
    public void destroy() {
        task.destroy();
    }

    @Override
    public int getPriority() {
        return task.getPriority();
    }

    @Override
    public long elapsed() {
        return task.elapsed();
    }

    @Override
    public int compareTo(Object o) {
        return task.compareTo(o);
    }

    @Override
    public void run() {
        try {
            //任務開始前把任務加入到正在執行的任務池
            TaskPoolListener.addTask(this);
            task.run();
        } finally {
            LOG.info("執行結束");
            TaskPoolListener.removeTask(this);
            //任務執行後,把任務從正在執行的任務池中移除
        }
    }


}

同步執行緒工具類

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 同步執行任務
 * Created by zengrenyuan on 18/06/11.
 */
public class SynchronizedExecuteTask extends BaseExecuteTask {

    private static final Logger LOG = LoggerFactory.getLogger(SynchronizedExecuteTask.class);
    //用於記錄任務是否執行完成
    private CountDownLatch latch = new CountDownLatch(1);

    public SynchronizedExecuteTask(ExecuteTask task) {
        super(task);
    }

    @Override
    public void run() {
        try {
            super.run();
        } finally {
            LOG.info("執行結束");
            latch.countDown();
        }
    }

    public void await() {
        //任務是否在超時時間之前執行完成
        boolean finished = false;
        try {
            finished = latch.await(getTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error(null, e);
        } finally {
            if (!finished) {
                destroy();
                throw new RuntimeException("任務執行超時");
            }
        }
    }


}

任務超時監控

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by zengrenyuan on 18/06/11.
 */
@Service
public class TaskPoolListener {

    private static final Logger LOG = LoggerFactory.getLogger(TaskPoolListener.class);
    private final static List<ExecuteTask> scriptTasks = new ArrayList<>();
    public static final int SLEEP_INTERVAL = 10_000;
    private Thread thread;

    @PostConstruct
    public void init() {
        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        doTask();
                    } catch (Exception e) {
                        LOG.info(null, e);
                    } finally {
                        this.sleep(SLEEP_INTERVAL);
                    }
                }
            }

            private void sleep(long interval) {
                try {
                    Thread.sleep(interval);
                } catch (InterruptedException var4) {
                    LOG.info(var4.getMessage());
                }

            }
        });
        thread.setName("任務執行超時檢測執行緒");
        thread.start();
    }

    /**
     * 用於監測超時任務
     * 如果任務超時呼叫destroy方法
     */
    public void doTask() {
        List<ExecuteTask> scriptTasks = new ArrayList<>(this.scriptTasks);
        for (ExecuteTask scriptTask : scriptTasks) {
            if (scriptTask.elapsed() > scriptTask.getTimeout()) {
                scriptTask.destroy();
            }
        }
    }

    public static synchronized void addTask(ExecuteTask scriptTask) {
        scriptTasks.add(scriptTask);
    }

    public static synchronized void removeTask(ExecuteTask scriptTask) {
        scriptTasks.remove(scriptTask);
    }

    public final int size() {
        return scriptTasks.size();
    }

}