ThreadPoolExecutor執行緒池的使用
阿新 • • 發佈:2018-12-14
實現例子:根據上傳檔案(.txt)按行讀取文字資料匯入mongodb (只做參考,實際也許不這麼幹) package net.youqu.manager.controller; import com.google.common.collect.Maps; import net.youqu.mongo.dao.RedisKeyDAO; import net.youqu.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import java.io.*; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author zs * description redis keys管理 */ @Controller @RequestMapping("/redis") public class RedisController { private static Logger logger = LoggerFactory.getLogger(RedisController.class); private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(3000); private ThreadPoolExecutor pool = new ThreadPoolExecutor(5000, 8000, 1, TimeUnit.MINUTES, queue); @Autowired private RedisKeyDAO redisKeyDAO; @PostMapping(value = "/upload") @ResponseBody public Map<String, String> rankList(MultipartFile file) { Map<String, String> map = Maps.newHashMap(); BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(file.getInputStream(), "utf-8")); String tempString = null; int line = 1; while ((tempString = reader.readLine()) != null) { if(!StringUtils.isEmpty(tempString)){ if (queue.size() == 2500) { try { System.out.println("佇列快滿了,開始休眠2秒"); Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException("執行緒休眠異常"); } } pool.execute(new InsertRunnalble(tempString, line)); } line++; } map.put("result", "success"); } catch (IOException e) { map.put("result", "資料匯入失敗"); e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { e1.printStackTrace(); } } } System.out.println("主執行緒:執行完畢*********************************************************************************"); return map; } class InsertRunnalble implements Runnable{ private String redisKey; private int lineNum; public InsertRunnalble(String key, int line){ redisKey = key; lineNum = line; } @Override public void run() { // DBObject query = new BasicDBObject(); // query.put("redis_key", tempString); // // DBObject update = new BasicDBObject(); // update.put("redis_key", tempString); // update.put("add_date_time", DatetimeUtil.formatDate(new Date())); // update.put("key_source", file.getOriginalFilename().split("\\.")[0]); // redisKeyDAO.getCollection().update(query, update, true, false); logger.info("----------------------->line {}:{}" , lineNum, redisKey); } } }