java 多執行緒管理執行緒任務,根據優先順序執行 任務超時管理 執行緒同步執行管理
阿新 • • 發佈:2019-01-24
需求
需要根據優先順序執行任務,有任務不是特別重要,可以稍後執行;需要對正在執行的執行緒做超時監控;有的API依賴任務返回結果,執行緒池執行的時候任務也支援同步任務;
簡單測試
建立一個使用支援優先順序佇列(new PriorityBlockingQueue<>() )的執行緒,然後每個任務實現 java.lang.Comparable 介面
new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD * 2, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue());
簡單測試一下基本可行,但是發現使用優先順序列隊執行緒池發現使用FutureTask 會拋異常 java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 所以這個方法不能再用了
Future<?> future = executor.submit(new Task());
設計思路
- 定義一個任務介面,繼承 Runnable, Comparable,並且獲取超時時間方法、優先順序方法、執行時間方法、超時中止任務方法;
- 有一個管理執行緒池的類,用於執行任務
- 有一個超時檢測的類
- 有一個基本執行類,用於每次任務執行時把任務加入監控列表,有一個同步任務類,管理執行同步任務
程式碼實現
任務介面
/**
* Created by zengrenyuan on 18/6/11.
*/
public interface ExecuteTask extends Runnable, Comparable {
/**
* @return 任務超時時間
*/
long getTimeout();
/**
* @return 任務的優先順序
*/
int getPriority();
/**
* 任務超時時呼叫
* 用於結束任務執行
*/
void destroy();
/**
* @return 任務當前的執行時間
*/
long elapsed();
}
執行緒池管理
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PreDestroy;
import java.util.concurrent.*;
/**
* Created by zengrenyuan on 18/06/11.
*/
public class ExecuteThreadManager {
private static final Logger LOG = LoggerFactory.getLogger(ExecuteThreadManager.class);
/**
* 執行緒池執行緒名稱
*/
public static final String SCRIPT_TASK_THREAD_POOL = "TaskThreadPool";
/**
* 最大執行緒數
*/
private final int MAX_THREAD = 10;
/**
* 一個按優先順序執行的執行緒池
*/
private final ThreadPoolExecutor executor = ThreadPoolUtils.newThreadPool(MAX_THREAD, SCRIPT_TASK_THREAD_POOL, new PriorityBlockingQueue<>());
private static class LazyHolder {
private static final ExecuteThreadManager INSTANCE = new ExecuteThreadManager();
}
private ExecuteThreadManager() {
}
/**
* 單例
*/
public static final ExecuteThreadManager getInstance() {
return LazyHolder.INSTANCE;
}
/**
* 把任務提交到執行緒池執行
*/
public void execute(ExecuteTask task) {
executor.execute(new BaseExecuteTask(task));
}
/**
* 同步執行任務,等任務執行完才會往下走
*/
public void syncExecute(ExecuteTask task) {
SynchronizedExecuteTask syncTask = new SynchronizedExecuteTask(task);
executor.execute(syncTask);
syncTask.await();
}
@PreDestroy
private void destroy() {
try {
executor.shutdownNow();
} catch (Exception e) {
LOG.error(null, e);
}
}
}
執行緒池生成工具類
import java.util.concurrent.*;
/**
* Created by zengrenyuan on 18/06/11.
*/
public class ThreadPoolUtils {
/**
* 生成一個執行緒池
*
* @param maxThread 執行緒池最大數
* @param threadPoolName 執行緒池名稱
* @return
*/
public static ThreadPoolExecutor newThreadPool(int maxThread, final String threadPoolName) {
return newThreadPool(maxThread, threadPoolName, new LinkedBlockingDeque<>());
}
/**
* 生成一個執行緒池
*
* @param maxThread 執行緒池最大數
* @param threadPoolName 執行緒池名稱
* @param workQueue 佇列
* @return 執行緒池
*/
public static ThreadPoolExecutor newThreadPool(int maxThread, final String threadPoolName, BlockingQueue workQueue) {
return new ThreadPoolExecutor(
maxThread,
maxThread,
0L,
TimeUnit.MILLISECONDS,
workQueue,
getThreadFactory(threadPoolName)
);
}
/**
* @param threadPoolName 執行緒名稱
* @return 執行緒生成器
*/
public static ThreadFactory getThreadFactory(final String threadPoolName) {
return new ThreadFactory() {
private int threadNumber = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadPoolName + "-" + threadNumber);
}
};
}
}
基礎任務類
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 簡單包裝一下
* 在任務執行前把任務加入到執行任務監控池
* Created by zengrenyuan on 18/06/11.
*/
public class BaseExecuteTask implements ExecuteTask {
private static final Logger LOG = LoggerFactory.getLogger(BaseExecuteTask.class);
protected ExecuteTask task;
public BaseExecuteTask(ExecuteTask task) {
this.task = task;
}
@Override
public long getTimeout() {
return task.getTimeout();
}
@Override
public void destroy() {
task.destroy();
}
@Override
public int getPriority() {
return task.getPriority();
}
@Override
public long elapsed() {
return task.elapsed();
}
@Override
public int compareTo(Object o) {
return task.compareTo(o);
}
@Override
public void run() {
try {
//任務開始前把任務加入到正在執行的任務池
TaskPoolListener.addTask(this);
task.run();
} finally {
LOG.info("執行結束");
TaskPoolListener.removeTask(this);
//任務執行後,把任務從正在執行的任務池中移除
}
}
}
同步執行緒工具類
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 同步執行任務
* Created by zengrenyuan on 18/06/11.
*/
public class SynchronizedExecuteTask extends BaseExecuteTask {
private static final Logger LOG = LoggerFactory.getLogger(SynchronizedExecuteTask.class);
//用於記錄任務是否執行完成
private CountDownLatch latch = new CountDownLatch(1);
public SynchronizedExecuteTask(ExecuteTask task) {
super(task);
}
@Override
public void run() {
try {
super.run();
} finally {
LOG.info("執行結束");
latch.countDown();
}
}
public void await() {
//任務是否在超時時間之前執行完成
boolean finished = false;
try {
finished = latch.await(getTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.error(null, e);
} finally {
if (!finished) {
destroy();
throw new RuntimeException("任務執行超時");
}
}
}
}
任務超時監控
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
/**
* Created by zengrenyuan on 18/06/11.
*/
@Service
public class TaskPoolListener {
private static final Logger LOG = LoggerFactory.getLogger(TaskPoolListener.class);
private final static List<ExecuteTask> scriptTasks = new ArrayList<>();
public static final int SLEEP_INTERVAL = 10_000;
private Thread thread;
@PostConstruct
public void init() {
thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
doTask();
} catch (Exception e) {
LOG.info(null, e);
} finally {
this.sleep(SLEEP_INTERVAL);
}
}
}
private void sleep(long interval) {
try {
Thread.sleep(interval);
} catch (InterruptedException var4) {
LOG.info(var4.getMessage());
}
}
});
thread.setName("任務執行超時檢測執行緒");
thread.start();
}
/**
* 用於監測超時任務
* 如果任務超時呼叫destroy方法
*/
public void doTask() {
List<ExecuteTask> scriptTasks = new ArrayList<>(this.scriptTasks);
for (ExecuteTask scriptTask : scriptTasks) {
if (scriptTask.elapsed() > scriptTask.getTimeout()) {
scriptTask.destroy();
}
}
}
public static synchronized void addTask(ExecuteTask scriptTask) {
scriptTasks.add(scriptTask);
}
public static synchronized void removeTask(ExecuteTask scriptTask) {
scriptTasks.remove(scriptTask);
}
public final int size() {
return scriptTasks.size();
}
}