檔案上傳-佇列多執行緒讀取檔案內容
阿新 • • 發佈:2019-01-03
佇列多執行緒讀取檔案內容
為什麼採用多執行緒佇列的方式呢?主要是如果檔案一行一行讀取處理的話,頁面上傳檔案後,需要等待伺服器響應。如果檔案的內容資料量很大的話,頁面就一直等待伺服器響應,畢竟伺服器處理這些資料是花費很多時間的,我們都知道,頁面是有響應時間的,一旦響應超時了,程式就終止了。所以,當我們需要處理大批的檔案時,可以採用佇列多執行緒的方式非同步去處理,頁面的響應時間只是上傳檔案的時間,一般在幾秒之內吧。檔案的資料一次性放到佇列中,多個執行緒非同步同時處理佇列中的資料。
1. 場景:檔案大小不是很大(十幾兆以內),讀取檔案的內容。
方案:檔案直接放到記憶體中,一行一行讀取處理
在這裡頁面上傳的我採用的是ajaxFileUpload外掛,非同步請求後臺。當然,也可以用form表單的形式。
頁面:
<input type="text"name="name"/> <input id="fileToUpload"type="file" name="file"onchange="selectFile(this);" style="margin-left: 50px;"/> <button onclick="ajaxFileUpload()">上傳</button> <script type="text/javascript"> functionajaxFileUpload() { varfile = $("input[name='file']").val(); if(typeof(file)=="undefined"||$.trim(file).length==0) { alert("請上傳檔案!"); return; } $.ajaxFileUpload({ url : '/upload', type: 'post', secureuri : false, fileElementId : 'fileToUpload',//input 的id dataType : 'json', data : { "name": name//這裡隨便傳遞了name,畢竟我們上傳檔案,還需要其他的一些資料 }, success : function(data) { //這裡根據後臺的返回資料進行處理 }, error : function(data, status, e) { alert("上傳發生異常"); } }) } // 判斷上傳檔案型別,該處設定的只允許上傳txt的檔案 functionselectFile(file) { varfileName = file.value; varmime = fileName.toLowerCase().substr(fileName.lastIndexOf(".")); if(mime!=".txt"){ alert("該檔案不是txt檔案,請重新選擇!"); file.outerHTML= file.outerHTML; } } </script>
後臺:Controller
@SuppressWarnings("unchecked") @ResponseBody @RequestMapping(value = "/upload", method =RequestMethod.POST) public voidfileUpload(HttpServletRequest request, HttpServletResponse response) throwsException{ Map<String,Object> result = newHashMap<String,Object>(); result.put("success", false); //使用Apache檔案上傳元件處理檔案上傳步驟: //1、建立一個DiskFileItemFactory工廠 DiskFileItemFactoryfactory = new DiskFileItemFactory(); //2、建立一個檔案上傳解析器 ServletFileUploadupload = new ServletFileUpload(factory); upload.setSizeMax(50*1024*1024);//設定該次上傳最大值為50M List<FileItem>list = upload.parseRequest(request); //注意:由於輸入流只能讀取一次,這裡聲明瞭兩個檔案流,inputStream用作檔案讀取處理,inputStreamCount用作計算檔案行數,動態定義佇列大小 InputStreaminputStream = null; InputStreaminputStreamCount = null; Stringname = null; for(FileItemfileItem : list){ //判斷某項是否是普通的表單型別,否則該表單項是file 型別的 if(fileItem.isFormField()){ if(fileItem.getFieldName().equals("name")){ name= fileItem.getString("UTF-8"); } }else{ //用作-統計檔案行數 inputStreamCount= fileItem.getInputStream(); inputStream= fileItem.getInputStream(); break; } } if(inputStream== null){ log.error("檔案為空!"); result.put("errorMsg","檔案為空!"); ResponseUtil.writeGson(response,result); return; } BufferedReader readerLineCount =new BufferedReader(newInputStreamReader(inputStreamCount)); //宣告一個記錄檔案行數的變數 int lineCount = 0; while(readerLineCount.readLine() !=null)//獲取檔案行數 { lineCount++; } readerLineCount.close(); System.out.println("檔案行數:"+lineCount); //動態建立佇列 int DEFAULT_IO_THREADS = 10; BlockingQueue<Runnable> queue = newArrayBlockingQueue<Runnable>(lineCount); ThreadPoolExecutor executor = newThreadPoolExecutor(DEFAULT_IO_THREADS, DEFAULT_IO_THREADS*2,1, TimeUnit.HOURS, queue, new ThreadPoolExecutor.CallerRunsPolicy()); String line = null; int i =0; BufferedReader reader =newBufferedReader(new InputStreamReader(inputStream)); while(StringUtils.isNotEmpty(line = reader.readLine())) { //處理每一行的資料 send(executor,line); i++; } result.put("count",i); result.put("success", true); reader.close(); ResponseUtil.writeGson(response, result); } @SuppressWarnings("unused") privatevoid send(ThreadPoolExecutor executor, String line) { final String final_ String line=line; executor.submit(newRunnable() { publicvoid run() { //在這裡寫讀取到的每一行的處理邏輯 } }); }
2. 場景:檔案大(幾十兆甚至更大),讀取檔案的內容。
方案:檔案不能直接放到記憶體中,會消耗大量的記憶體。採用上傳至伺服器臨時資料夾中,從檔案中一行一行讀取處理
頁面跟例子1是一樣的,主要是後臺的處理方式不一樣
後臺:Controller
使用MultipartFilem需要配置
<!-- 處理檔案上傳 -->
<bean id="multipartResolver"
class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
<property name="defaultEncoding" value="UTF-8" /> <!-- 編碼 -->
<property name="maxInMemorySize" value="10240" /> <!-- 上傳時佔用最大記憶體大小 (10240) -->
<property name="uploadTempDir" value="/uploadTemp/" /> <!-- 上傳臨時儲存目錄名 ,帶檔案上傳完成之後會自動刪除儲存的檔案-->
<property name="maxUploadSize" value="-1" /> <!-- 最大檔案大小,-1為無限止(-1) -->
</bean>
@ResponseBody
@RequestMapping(value = "/upload", method =RequestMethod.POST)
public void fileUpload(@RequestParam("file") MultipartFilemultipartFile ,HttpServletRequest request, HttpServletResponse response) throwsException{
Date startTime = new Date();
Map<String,Object> result =new HashMap<String,Object>();
result.put("success",false);
if(multipartFile.isEmpty()){
log.error("檔案為空!");
result.put("errorMsg","檔案為空!");
ResponseUtil.writeGson(response,result);
return;
}
String name =request.getParameter("name");//這種方式下,引數可以直接這麼取
// 獲取上傳的檔案儲存的路徑
String path = request.getSession().getServletContext().getRealPath("uploadTemp");
//如果目錄不存在,建立資料夾
if (!new File(path).exists()){
newFile(path).mkdirs();
}
// 獲取上傳的檔案的名稱
String filename =multipartFile.getOriginalFilename();
//檔案上傳至指定目錄下
File targetFile = new File(path,filename);
multipartFile.transferTo(targetFile);//上傳
//計算檔案行數(字元流 速度最快)
//宣告一個BufferReader物件
BufferedReader bReader = null;
//用字元流物件例項化宣告的BufferReader物件
bReader = new BufferedReader(newFileReader(targetFile));
//宣告一個記錄檔案行數的變數
int lineCount = 0;
//宣告一個儲存檔案每行資料的String變數
while(bReader.readLine() != null)//獲取檔案行數
{
lineCount++;
}
System.out.println("檔案行數:"+lineCount);
bReader.close(); //進行字串與浮點型別資料轉換時,先關閉剛操作的檔案
int DEFAULT_IO_THREADS = 10;
BlockingQueue<Runnable> queue= new ArrayBlockingQueue<Runnable>(lineCount);
ThreadPoolExecutor executor = newThreadPoolExecutor(DEFAULT_IO_THREADS,
DEFAULT_IO_THREADS*2,1, TimeUnit.HOURS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
//通過行迭代,遍歷檔案的每一行,而不是把所有行都放在記憶體中 ,處理完之後把它扔掉。
LineIterator it =FileUtils.lineIterator(targetFile, "UTF-8");
int i=0;
try {
while (it.hasNext()) {
String line = it.nextLine();
long userId = Long.valueOf(line);
send(executor,line);
i++;
}
result.put("count", i);
result.put("success", true);
Date endTime = new Date();
System.out.println("耗時:"+(endTime.getTime()-startTime.getTime())+"毫秒");
//刪除臨時檔案
LineIterator.closeQuietly(it);
targetFile.delete();
ResponseUtil.writeGson(response, result);
} catch (Exception e){
result.put("errorMsg",e.getMessage());
ResponseUtil.writeGson(response,result);
}
finally {
LineIterator.closeQuietly(it);
}
}
@SuppressWarnings("unused")
privatevoid send(ThreadPoolExecutor executor,String line) {
final String line = line;
executor.submit(newRunnable() {
publicvoid run() {
//在這裡寫處理邏輯
}
});
}
補充:像這種檔案上傳的,一定要防止重複提交。所以,防重複提交必須防止。
方案: 1.前端,js點選提交後,立刻移除點選事件。
2.後端,加鎖。private Map<String, String> objLock = new ConcurrentHashMap<String, String>();