1. 程式人生 > >檔案上傳-佇列多執行緒讀取檔案內容

檔案上傳-佇列多執行緒讀取檔案內容

佇列多執行緒讀取檔案內容

為什麼採用多執行緒佇列的方式呢?主要是如果檔案一行一行讀取處理的話,頁面上傳檔案後,需要等待伺服器響應。如果檔案的內容資料量很大的話,頁面就一直等待伺服器響應,畢竟伺服器處理這些資料是花費很多時間的,我們都知道,頁面是有響應時間的,一旦響應超時了,程式就終止了。所以,當我們需要處理大批的檔案時,可以採用佇列多執行緒的方式非同步去處理,頁面的響應時間只是上傳檔案的時間,一般在幾秒之內吧。檔案的資料一次性放到佇列中,多個執行緒非同步同時處理佇列中的資料。

1.      場景:檔案大小不是很大(十幾兆以內),讀取檔案的內容。

方案:檔案直接放到記憶體中,一行一行讀取處理

在這裡頁面上傳的我採用的是ajaxFileUpload外掛,非同步請求後臺。當然,也可以用form表單的形式。

頁面:

<input type="text"name="name"/>  
<input id="fileToUpload"type="file" name="file"onchange="selectFile(this);" style="margin-left: 50px;"/>  
<button onclick="ajaxFileUpload()">上傳</button>
 
<script type="text/javascript">
 
              functionajaxFileUpload() {
                           
                            varfile = $("input[name='file']").val();
                           
                            if(typeof(file)=="undefined"||$.trim(file).length==0) {
                                          alert("請上傳檔案!");
                                          return;
                            }
                  $.ajaxFileUpload({
                      url : '/upload',
                      type: 'post',
                      secureuri : false,
                      fileElementId : 'fileToUpload',//input 的id
                      dataType : 'json',
                      data : {
                                  "name": name//這裡隨便傳遞了name,畢竟我們上傳檔案,還需要其他的一些資料
                      },
                      success : function(data) {
                                  //這裡根據後臺的返回資料進行處理
                      },
                      error : function(data, status, e) {
                          alert("上傳發生異常");
                      }
                  })
              }            
              // 判斷上傳檔案型別,該處設定的只允許上傳txt的檔案
              functionselectFile(file) {
                            varfileName = file.value;
                            varmime = fileName.toLowerCase().substr(fileName.lastIndexOf("."));
                            if(mime!=".txt"){
                                          alert("該檔案不是txt檔案,請重新選擇!");
                                          file.outerHTML= file.outerHTML;
                            }
              }
</script>


後臺:Controller

@SuppressWarnings("unchecked")
@ResponseBody
   @RequestMapping(value = "/upload", method =RequestMethod.POST) 
    public voidfileUpload(HttpServletRequest request, HttpServletResponse response) throwsException{ 
          Map<String,Object> result = newHashMap<String,Object>();
          result.put("success", false);
          //使用Apache檔案上傳元件處理檔案上傳步驟:
              //1、建立一個DiskFileItemFactory工廠
              DiskFileItemFactoryfactory = new DiskFileItemFactory();
              //2、建立一個檔案上傳解析器
                            ServletFileUploadupload = new ServletFileUpload(factory);
                            upload.setSizeMax(50*1024*1024);//設定該次上傳最大值為50M
                           
                            List<FileItem>list = upload.parseRequest(request);
                              //注意:由於輸入流只能讀取一次,這裡聲明瞭兩個檔案流,inputStream用作檔案讀取處理,inputStreamCount用作計算檔案行數,動態定義佇列大小
                            InputStreaminputStream = null;
                            InputStreaminputStreamCount = null;
                            Stringname = null;
                            for(FileItemfileItem : list){
                                          //判斷某項是否是普通的表單型別,否則該表單項是file 型別的
                                          if(fileItem.isFormField()){
                                                        if(fileItem.getFieldName().equals("name")){
                                                                      name= fileItem.getString("UTF-8");
                                                        }
                                          }else{
                                                        //用作-統計檔案行數
                                                        inputStreamCount= fileItem.getInputStream();
                                                        inputStream= fileItem.getInputStream();
                                                        break;
                                          }
                            }
                            if(inputStream== null){
                                          log.error("檔案為空!");
                        result.put("errorMsg","檔案為空!");
                        ResponseUtil.writeGson(response,result);
                        return;
                            }
       BufferedReader readerLineCount =new BufferedReader(newInputStreamReader(inputStreamCount));
        //宣告一個記錄檔案行數的變數
          int lineCount = 0;
          while(readerLineCount.readLine() !=null)//獲取檔案行數
          {
             lineCount++;
           }
          readerLineCount.close();
          System.out.println("檔案行數:"+lineCount);
         
          //動態建立佇列
          int DEFAULT_IO_THREADS = 10;
          BlockingQueue<Runnable> queue = newArrayBlockingQueue<Runnable>(lineCount);
          ThreadPoolExecutor executor = newThreadPoolExecutor(DEFAULT_IO_THREADS,
                                      DEFAULT_IO_THREADS*2,1, TimeUnit.HOURS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
          String line = null;
          int i =0;
          BufferedReader reader =newBufferedReader(new InputStreamReader(inputStream));
                            while(StringUtils.isNotEmpty(line = reader.readLine())) {
                                          //處理每一行的資料
                                          send(executor,line);
                                          i++;
                            }
                            result.put("count",i);
                  result.put("success", true);
                  reader.close();
                  ResponseUtil.writeGson(response, result);
    }
   
   @SuppressWarnings("unused")
              privatevoid send(ThreadPoolExecutor executor, String line) {
         
          final String final_ String line=line;
              executor.submit(newRunnable() {
                                          publicvoid run() {
                                                        //在這裡寫讀取到的每一行的處理邏輯
 
 
                                                       
                                          }
                            });
}

2.      場景:檔案大(幾十兆甚至更大),讀取檔案的內容。

方案:檔案不能直接放到記憶體中,會消耗大量的記憶體。採用上傳至伺服器臨時資料夾中,從檔案中一行一行讀取處理

頁面跟例子1是一樣的,主要是後臺的處理方式不一樣

後臺:Controller

 使用MultipartFilem需要配置

<!-- 處理檔案上傳 -->
    <bean id="multipartResolver"
        class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
        <property name="defaultEncoding" value="UTF-8" /> <!-- 編碼 -->
        <property name="maxInMemorySize" value="10240" /> <!-- 上傳時佔用最大記憶體大小 (10240) -->
        <property name="uploadTempDir" value="/uploadTemp/" /> <!-- 上傳臨時儲存目錄名 ,帶檔案上傳完成之後會自動刪除儲存的檔案-->
        <property name="maxUploadSize" value="-1" /> <!-- 最大檔案大小,-1為無限止(-1) -->
    </bean>


@ResponseBody
   @RequestMapping(value = "/upload", method =RequestMethod.POST) 
   public void fileUpload(@RequestParam("file") MultipartFilemultipartFile ,HttpServletRequest request, HttpServletResponse response) throwsException{ 
                 Date startTime = new Date();
                 Map<String,Object> result =new HashMap<String,Object>();
                 result.put("success",false);
                 if(multipartFile.isEmpty()){
                               log.error("檔案為空!");
                               result.put("errorMsg","檔案為空!");
                               ResponseUtil.writeGson(response,result);
                               return;
       }
                 String name =request.getParameter("name");//這種方式下,引數可以直接這麼取
                 // 獲取上傳的檔案儲存的路徑  
                 String path = request.getSession().getServletContext().getRealPath("uploadTemp");  
                 //如果目錄不存在,建立資料夾 
                 if (!new File(path).exists()){   
                               newFile(path).mkdirs();
                 }
                 // 獲取上傳的檔案的名稱  
                 String filename =multipartFile.getOriginalFilename();  
                
                
                 //檔案上傳至指定目錄下
                 File targetFile = new File(path,filename);
                 multipartFile.transferTo(targetFile);//上傳
                   
                 //計算檔案行數(字元流 速度最快)
                 //宣告一個BufferReader物件
                 BufferedReader bReader = null;
                 //用字元流物件例項化宣告的BufferReader物件
                 bReader = new BufferedReader(newFileReader(targetFile));
                 //宣告一個記錄檔案行數的變數
                 int lineCount = 0;
                 //宣告一個儲存檔案每行資料的String變數
                 while(bReader.readLine() != null)//獲取檔案行數
                 {
                    lineCount++;
                  }
                 System.out.println("檔案行數:"+lineCount);
                 bReader.close();   //進行字串與浮點型別資料轉換時,先關閉剛操作的檔案
                
                 int DEFAULT_IO_THREADS = 10;
                 BlockingQueue<Runnable> queue= new ArrayBlockingQueue<Runnable>(lineCount);
                 ThreadPoolExecutor executor = newThreadPoolExecutor(DEFAULT_IO_THREADS,
                                             DEFAULT_IO_THREADS*2,1, TimeUnit.HOURS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
                 //通過行迭代,遍歷檔案的每一行,而不是把所有行都放在記憶體中 ,處理完之後把它扔掉。
                 LineIterator it =FileUtils.lineIterator(targetFile, "UTF-8");
                 int i=0;
                 try {
                     while (it.hasNext()) {
                         String line = it.nextLine();
                         long userId = Long.valueOf(line);
                                                 send(executor,line);
                                                 i++;
                     }
                     result.put("count", i);
                     result.put("success", true);
                     Date endTime = new Date();
                     System.out.println("耗時:"+(endTime.getTime()-startTime.getTime())+"毫秒");
                     //刪除臨時檔案
                     LineIterator.closeQuietly(it);
                     targetFile.delete();
                     ResponseUtil.writeGson(response, result);
                 } catch (Exception e){
                               result.put("errorMsg",e.getMessage());
                               ResponseUtil.writeGson(response,result);
                 }
                 finally {
                     LineIterator.closeQuietly(it);
                 }
    }
   
   @SuppressWarnings("unused")
       privatevoid send(ThreadPoolExecutor executor,String line) {
                 final String line = line;
                     executor.submit(newRunnable() {
                                   publicvoid run() {
                                                 //在這裡寫處理邏輯
                                   }
                     });
  }

補充:像這種檔案上傳的,一定要防止重複提交。所以,防重複提交必須防止。

方案: 1.前端,js點選提交後,立刻移除點選事件。

             2.後端,加鎖。private Map<String, String> objLock = new ConcurrentHashMap<String, String>();