1. 程式人生 > >Java執行緒監控及中斷

Java執行緒監控及中斷

我們系統中經常有耗費時間長的任務,但客戶端往往需要馬上得到迴應。這時我們就可以如下步驟實現:

1、客戶端發起請求執行任務(選定條件,下載報表);

2、首先將任務ID及開始時間,起始狀態記錄到資料庫表中;

3、另起一個後臺執行緒去執行這個耗時任務(比如生成報表);

4、執行緒執行成功或失敗狀態記錄到資料庫;

5、客戶通過非同步查詢資料(下載報表或其他操作)。

好了,大致步驟我們清楚了。假如這個耗時任務一直執行,而且和消耗系統資源。我們往往想放棄這個任務的執行,再縮小範圍執行更小的任務執行。那我們如何實現吶!

話不多說,直接上程式碼:

1.首先我們實現一個執行緒管理工具:

import java.sql.DriverManager
import java.util.concurrent.ConcurrentHashMap

import org.slf4j.LoggerFactory

import scala.util.{Failure, Success, Try}

/**
* 類功能描述:報表執行緒管理器
*
* @author WangXueXing create at 18-11-2 上午11:35
* @version 1.0.0
*/
object ReportThreadManager {
private val logger = LoggerFactory.getLogger(getClass)
/**
* 報表ID與對應執行緒map
*/
val REPORT_THREAD_MAP: ConcurrentHashMap[Long, Thread] = new ConcurrentHashMap()

/**
* 將對應報表子執行緒放入執行緒池
*
* @param reportId 報表ID
* @param thread 對應子執行緒
*/
def put(reportId: Long, thread: Thread): Unit = {
REPORT_THREAD_MAP.put(reportId, thread)
}

/**
* 獲取對應報表執行緒
* @param reportId 報表ID
* @return
*/
def get(reportId: Long): Thread ={
REPORT_THREAD_MAP.get(reportId)
}

/**
* 將對應報表子執行緒移除執行緒池
* @param reportId 報表ID
*/
def remove(reportId: Long): Unit ={
REPORT_THREAD_MAP.remove(reportId)
}

/**
* 銷燬指定報表子執行緒
* @param reportId 報表ID
*/
def deploy(reportId: Long)={
val thread = REPORT_THREAD_MAP.get(reportId)
if(thread != null){
Try{
if(!thread.isInterrupted){
logger.info(s"執行緒:${reportId} 開始被結束")

logger.info("before interrupt")
thread.getStackTrace.foreach(println)

thread.interrupt()

Thread.sleep(10)
logger.info("after interrupt")
thread.getStackTrace.foreach(println)
}
} match {
case Success(x) => logger.info(s"執行緒:${reportId} 被成功殺死")
case Failure(e) => logger.error(s"執行緒:${reportId} interrupt 失敗", e)
}
REPORT_THREAD_MAP.remove(reportId)
}
}

val thread1 = new Thread(new Runnable {
override def run(): Unit = {
ReportThreadManager.deploy(1)
println(s"thread 1 killed")
}
})

def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val con = DriverManager.getConnection("jdbc:hive2://192.168.71.127:10000/finance", "goods", null)
val stmt = con.createStatement
var res = stmt.executeQuery("SELECT company_name,store_code,store_name,source_date,trade_type,trade_no,third_party_payment_no,business_type,cast(business_amount as decimal(20, 2)) business_amount,cast(service_charge as decimal(20, 4)) service_charge,trade_time,customer_account,updated_at,created_at FROM t_pay_source_data")
while(res.next()){
println(res.getString(1))
}
}
}
此工具可以實現根據任務ID添加當前任務執行執行緒,也可以從執行緒池移除此執行緒,根據任務ID中斷執行緒。


2.如下任務執行過程及如何呼叫執行緒管理及中斷執行緒:
import java.io.{File, FileInputStream}

import com.today.api.financereport.scala.request.ReportInfo
import com.today.api.financereport.scala.response.ServiceResponse
import com.today.service.financereport.action.{ExportReportRecordFailureAction, ExportReportRecordSuccessAction, StartExportReportAction}
import com.today.service.financereport.common.ReportThreadManager
import com.today.service.financereport.dto.{ExportReportFailureInput, ExportReportSuccessInput, ReportOutput}
import com.today.service.financereport.util.{Debug, OssUtil}
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

/**
* 報表匯出流程限定類
*
* @author BarryWang create at 2018/5/17 9:15
* @version 0.0.1
*/
trait ReportAction {
protected val logger = LoggerFactory.getLogger(getClass)
/**
* 報表匯出流程
* @return
*/
def execute: ServiceResponse = {
var result:ServiceResponse = null
var count = Counter.count
logger.info(s"---1生成報表前執行個數:${count}----------")
if(count >= Counter.MAX_COUNT.intValue()){
result = ServiceResponse("405", s"目前有多個報表正在匯出,請5分鐘後再操作,謝謝!")
} else {
Counter.increment
count = Counter.count
logger.info(s"---2啟動生成報表執行個數:${count}----------")
var reportId: Long = -1
try {
//1. 前置檢查
preCheck

//2. 開始生成報表紀錄
val reportInfo = setReportInfo
reportId = startGenerate

//3. 生成報表處理
val output: Future[ReportOutput] = Future {
Debug.reset()
//添加當前子執行緒到執行緒管理池

ReportThreadManager.put(reportId, Thread.currentThread())
//1) 載入模板
val templateInfo = Debug.trace(s"${reportInfo.reportType}-loadTemplate:")(loadTemplate(setTemplatePath))
//2) 生成報表
Debug.trace(s"${reportInfo.reportType}-generateReport:")(generateReport(reportId, templateInfo))
//3) 儲存報表
val output = Debug.trace(s"${reportInfo.reportType}-saveReport:")(saveReport(templateInfo.localFile))

//將此子執行緒從執行緒管理池移除

ReportThreadManager.remove(reportId)

Debug.info()
output
}
output.onComplete {
case Success(v) => {
successGenerate(ExportReportSuccessInput(reportId, v))
Counter.decrement
count = Counter.count
logger.info(s"---3結束報表生成執行個數:${count}----------")
}
case Failure(e) => {
failureGenerate(ExportReportFailureInput(reportId, e))
Counter.decrement
count = Counter.count
logger.info(s"---3結束報表生成執行個數:${count}----------")
}
}

//4. 後置檢查
postCheck

result = ServiceResponse("200", "請到匯出管理檢視或下載報表")
} catch {
case NonFatal(e) =>
Counter.decrement
count = Counter.count
logger.info(s"---3結束報表生成執行個數:${count}----------")
failureGenerate(ExportReportFailureInput(reportId, e))
throw e
} finally {}
}
result
}

/**
* 前置條件檢查:動作、狀態等業務邏輯
*/
def preCheck

/**
* 設定報表資訊
* @return
*/
def setReportInfo: ReportInfo

/**
* 設定模板路徑
* @return
*/
def setTemplatePath: String

/**
* 開始生成報表紀錄
*/
def startGenerate(): Long = {
new StartExportReportAction(setReportInfo).execute
}

/**
* 載入模板
* @param templatPath
*/
def loadTemplate(templatPath: String): ExcelTemaplateInfo = {
val suffix = isZip match {
case true => ".zip"
case false => ".xlsx"
}
//生成本地檔案
val localFile = File.createTempFile(downloadFileName+"_", suffix)
ExcelTemaplateInfo(templatPath, localFile)
}

/**
* 下載檔名
* @return
*/
def downloadFileName: String = setReportInfo.reportType.name

/**
* 根據資料生成報表
* @return
*/
def generateReport(reportId: Long, templateInfo: ExcelTemaplateInfo)

/**
* 將生成在本地的報表上傳到阿里SSO
* @param localFile
* @return
*/
def saveReport(localFile: File): ReportOutput = {
val fileUrl = OssUtil.uploadFileStream(new FileInputStream(localFile), localFile.getName)
localFile.deleteOnExit()
val suffix = isZip match {
case true => ".zip"
case false => ".xlsx"
}
// OssUtil.downloadFile(fileUrl, "/home/barry/data/1122322"+suffix)
ReportOutput(fileUrl)
}

/**
* 最終生成報表是否為Zip
* @return
*/
def isZip: Boolean

/**
* 成功生成報表紀錄
* @param result: ExportReportSuccessInput
*/
def successGenerate(result: ExportReportSuccessInput): Unit = {
new ExportReportRecordSuccessAction(result).execute
}

/**
* 失敗生成報表紀錄
* @param result: ExportReportFailureInput
*/
def failureGenerate(result: ExportReportFailureInput): Unit = {
new ExportReportRecordFailureAction(result).execute
}

/**
* 後置檢查
*/
def postCheck = {}
}

3.客戶端觸發中斷當前任務執行緒:
import com.today.api.financereport.scala.response.ServiceResponse
import com.today.service.commons.Action
import com.today.service.financereport.action.sql.ExportReportRecordActionSql
import com.today.service.financereport.common.ReportThreadManager

/**
* 類功能描述:報表刪除Action
*
* @author WangXueXing create at 18-11-2 下午2:17
* @version 1.0.0
*/
class DeleteExportReportAction(id: Long) extends Action[ServiceResponse]{
override def preCheck: Unit = {}

override def action: ServiceResponse = {
val result = ExportReportRecordActionSql.deleteExportReportById(id)
result match {
case 0 => ServiceResponse("501","刪除報表失敗")
case _ => {
//刪除報表的同時結束生成報表執行緒
new Thread(new Runnable {
/**
* 延遲退出系統
*/
override def run(): Unit = {
Thread.sleep(50)
ReportThreadManager.deploy(id)
}
}).start()
ServiceResponse("200","success")
}
}
}
}