1. 程式人生 > >javaWeb 使用線程池+隊列解決"訂單並發"問題

javaWeb 使用線程池+隊列解決"訂單並發"問題

單例 obj ngs cto rate rtl hashmap ioc bject

解決方式:使用線程池+隊列

項目基於Spring,如果不用spring需要自己把

ThreadPoolManager.java

改成單例模式

1.寫一個Controller(Spring mvc)

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description
 */
@Controller
public class ThreadPoolController {
    @Autowired
    ThreadPoolManager tpm;

    @RequestMapping("/pool")
    public
    @ResponseBody
    Object test() {
        for (int i = 0; i < 500; i++) {
            //模擬並發500條記錄
            tpm.processOrders(Integer.toString(i));
        }

        return "ok";
    }
}

2.線程池管理

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description threadPool訂單線程池, 處理訂單
 * scheduler 調度線程池 用於處理訂單線程池由於超出線程範圍和隊列容量而不能處理的訂單
 */
@Component
public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
    private BeanFactory factory;//用於從IOC裏取對象
    // 線程池維護線程的最少數量
    private final static int CORE_POOL_SIZE = 2;
    // 線程池維護線程的最大數量
    private final static int MAX_POOL_SIZE = 10;
    // 線程池維護線程所允許的空閑時間
    private final static int KEEP_ALIVE_TIME = 0;
    // 線程池所使用的緩沖隊列大小
    private final static int WORK_QUEUE_SIZE = 50;
    // 消息緩沖隊列
    Queue<Object> msgQueue = new LinkedList<Object>();

    //用於儲存在隊列中的訂單,防止重復提交
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();

    //由於超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //System.out.println("太忙了,把該訂單交給調度線程池逐一處理" + ((DBThread) r).getMsg());
            msgQueue.offer(((DBThread) r).getMsg());
        }
    };

    // 訂單線程池
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
            TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

    // 調度線程池。此線程池支持定時以及周期性執行任務的需求。
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    // 訪問消息緩存的調度線程,每秒執行一次
    // 查看是否有待定請求,如果有,則創建一個新的AccessDBThread,並添加到線程池中
    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            if (!msgQueue.isEmpty()) {
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    System.out.print("調度:");
                    String orderId = (String) msgQueue.poll();
                    DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                    accessDBThread.setMsg(orderId);
                    threadPool.execute(accessDBThread);
                }
                // while (msgQueue.peek() != null) {
                // }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);

    //終止訂單線程池+調度線程池
    public void shutdown() {
        //true表示如果定時任務在執行,立即中止,false則等待任務結束後再停止
        System.out.println(taskHandler.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();
    }

    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }


    //將任務加入訂單線程池
    public void processOrders(String orderId) {
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId,new Object());
            DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
            accessDBThread.setMsg(orderId);
            threadPool.execute(accessDBThread);
        }
    }


    //BeanFactoryAware
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }
}

3.線程池中工作的線程

//線程池中工作的線程
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
    private String msg;
    private Logger log = LoggerFactory.getLogger(DBThread.class);

    @Autowired
    SystemLogService systemLogService;



    @Override
    public void run() {
        //模擬在數據庫插入數據
        Systemlog systemlog = new Systemlog();
        systemlog.setTime(new Date());
        systemlog.setLogdescribe(msg);
        //systemLogService.insert(systemlog);
        log.info("insert->" + msg);
    }


    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

瀏覽器輸入地址127.0.0.1/pool

技術分享圖片

幾秒後關閉tomcat。

模擬500條數據,訂單線程池處理了117條。調度線程池處理5條

技術分享圖片

關閉tomcat,後還有378條未處理(這裏的實現需要用到spring監聽器)。加起來一共500

OK。完畢

spring監聽器,監聽tomcat關閉事件:

public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {

    @Autowired
    ThreadPoolManager threadPoolManager;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event instanceof ContextClosedEvent) {
            XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource();
            //防止執行兩次。root application context 沒有parent,他就是老大
            if (x.getDisplayName().equals("Root WebApplicationContext")) {
                threadPoolManager.shutdown();
                Queue q = threadPoolManager.getMsgQueue();
                System.out.println("關閉了服務器,還有未處理的信息條數:" + q.size());
            }


        } else if (event instanceof ContextRefreshedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else if (event instanceof ContextStartedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else if (event instanceof ContextStoppedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else {
//            System.out.println("有其它事件發生:"+event.getClass().getName());
        }
    }
}

spring配置一下

<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>

javaWeb 使用線程池+隊列解決"訂單並發"問題