1. 程式人生 > >ThreadPoolExecutor執行緒池的使用

ThreadPoolExecutor執行緒池的使用

實現例子:根據上傳檔案(.txt)按行讀取文字資料匯入mongodb (只做參考,實際也許不這麼幹) 


package net.youqu.manager.controller;


import com.google.common.collect.Maps;
import net.youqu.mongo.dao.RedisKeyDAO;
import net.youqu.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

import java.io.*;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author zs
 * description redis keys管理
 */
@Controller
@RequestMapping("/redis")
public class RedisController {

    private static Logger logger = LoggerFactory.getLogger(RedisController.class);

    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(3000);

    private ThreadPoolExecutor pool = new ThreadPoolExecutor(5000, 8000, 1, TimeUnit.MINUTES, queue);

    @Autowired
    private RedisKeyDAO redisKeyDAO;

    @PostMapping(value = "/upload")
    @ResponseBody
    public Map<String, String> rankList(MultipartFile file) {
        Map<String, String> map = Maps.newHashMap();
        BufferedReader reader = null;
        try {
            reader = new BufferedReader(new InputStreamReader(file.getInputStream(), "utf-8"));
            String tempString = null;
            int line = 1;
            while ((tempString = reader.readLine()) != null) {
                if(!StringUtils.isEmpty(tempString)){
                    if (queue.size() == 2500) {
                        try {
                            System.out.println("佇列快滿了,開始休眠2秒");
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException("執行緒休眠異常");
                        }
                    }
                    pool.execute(new InsertRunnalble(tempString, line));
                }
                line++;
            }
            map.put("result", "success");
        } catch (IOException e) {
            map.put("result", "資料匯入失敗");
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        System.out.println("主執行緒:執行完畢*********************************************************************************");
        return map;
    }

    class InsertRunnalble implements Runnable{
        private String redisKey;
        private int lineNum;

        public InsertRunnalble(String key, int line){
            redisKey = key;
            lineNum = line;
        }

        @Override
        public void run() {
//                    DBObject query = new BasicDBObject();
//                    query.put("redis_key", tempString);
//
//                    DBObject update = new BasicDBObject();
//                    update.put("redis_key", tempString);
//                    update.put("add_date_time", DatetimeUtil.formatDate(new Date()));
//                    update.put("key_source", file.getOriginalFilename().split("\\.")[0]);
//                    redisKeyDAO.getCollection().update(query, update, true, false);
            logger.info("----------------------->line {}:{}" , lineNum, redisKey);
        }
    }

}