1. 程式人生 > >Callable+ThreadPoolExecutor實現多執行緒併發並獲得返回值

Callable+ThreadPoolExecutor實現多執行緒併發並獲得返回值

前言
經常會遇到一些效能問題,比如呼叫某個介面,可能要迴圈呼叫100次,並且需要拿到每一次呼叫的返回結果,通常我們都是放在for迴圈中一次次的序列呼叫,這種方式可想而知道有多慢,那怎麼解決這個問題呢?

多執行緒
為了解決以上問題,我使用的方式是多執行緒。多執行緒常規的有兩種實現方式,即繼承Tread類,實現Runnable介面,但是這兩種實現方式,有一個共同的問題,就是沒有返回值,對於我們來說,獲得每個執行緒的返回值,是個很困難的問題,因此不能用Tread類或Runnable介面,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允許有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到執行緒的執行結果

案例
假設需要給100個使用者傳送郵件,並需要每個使用者的返回結果,先看下程式碼結構
這裡寫圖片描述

CallableTemplate.java

package com.mairuan.base.concurrent;

import java.util.concurrent.Callable;

/**
 * 多執行緒模板類
 * 
 * @author Administrator
 *
 * @param <V>
 */
public abstract class CallableTemplate<V> implements Callable<V> {
    /**
     * 前置處理,子類可以Override該方法
     */
public void beforeProcess() { System.out.println("before process...."); } /** * 處理業務邏輯的方法,需要子類去Override * @return */ public abstract V process(); /** * 後置處理,子類可以Override該方法 */ public void afterProcess() { System.out.println("after process...."
); } @Override public V call() throws Exception { beforeProcess(); V result = process(); afterProcess(); return result; } }

CallableTemplate類實現了Callable介面,並實現了process方法,該類是一個抽象類,接收任意返回值的型別,beforeProcess方法為前置處理,afterProcess的後置處理,process為具體的業務邏輯抽象方法,該方法在子類中實現

IConcurrentThreadPool.java

package com.mairuan.base.concurrent;

import java.util.List;
import java.util.concurrent.ExecutionException;

public interface IConcurrentThreadPool {
    /**
     * 初始化執行緒池
     */
    void initConcurrentThreadPool();

    /**
     * 提交單個任務
     * 
     * @param task
     * @return
     */
    <V> V submit(CallableTemplate<V> task) throws InterruptedException,
            ExecutionException;

    /**
     * 提交多個任務
     * 
     * @param tasks
     * @return
     */
    <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException;
}

IConcurrentThreadPool是多執行緒介面類,聲名了三個方法,initConcurrentThreadPool:初始化執行緒池,submit:提交單個任務的執行緒,並有返回值,invokeAll:提交多個任務的執行緒,並有返回值

ConcurrentThreadPool.java

package com.mairuan.base.concurrent.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.mairuan.base.concurrent.CallableTemplate;
import com.mairuan.base.concurrent.IConcurrentThreadPool;

public class ConcurrentThreadPool implements IConcurrentThreadPool {
    private ThreadPoolExecutor threadPoolExecutor;
    // 核心執行緒數
    private int corePoolSize = 10;
    // 最大執行緒數
    private int maximumPoolSize = 20;
    // 超時時間30秒
    private long keepAliveTime = 30;

    @Override
    public void initConcurrentThreadPool() {
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>());
    }

    @Override
    public <V> V submit(CallableTemplate<V> task) throws InterruptedException,
            ExecutionException {
        Future<V> result = threadPoolExecutor.submit(task);

        return result.get();
    }

    @Override
    public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException {

        List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
        List<V> resultList = new ArrayList<V>();

        for (Future<V> future : tasksResult) {
            resultList.add(future.get());
        }

        return resultList;
    }

}

ConcurrentThreadPool是建立執行緒池的實現類,用到了ThreadPoolExecutor執行緒池類及這個類的invokeAll方法和submit方法,這兩個方法的返回值,都可以通過Future類的get方法獲得

ICallableTaskFrameWork.java

public interface ICallableTaskFrameWork {
    <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException;
}

ICallableTaskFrameWork是定義的執行緒任務框架介面,所有的多執行緒呼叫,都通過該介面發起

CallableTaskFrameWork.java

package com.mairuan.base.concurrent.impl;

import java.util.List;
import java.util.concurrent.ExecutionException;

import com.mairuan.base.concurrent.CallableTemplate;
import com.mairuan.base.concurrent.ICallableTaskFrameWork;
import com.mairuan.base.concurrent.IConcurrentThreadPool;

public class CallableTaskFrameWork implements ICallableTaskFrameWork {

    private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();

    @Override
    public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException {

        concurrentThreadPool.initConcurrentThreadPool();

        return concurrentThreadPool.invokeAll(tasks);
    }

}

CallableTaskFrameWork是ICallableTaskFrameWork 的實現類,在submitsAll實現方法中,通過呼叫執行緒池物件IConcurrentThreadPool介面的invokeAll方法來發起多執行緒的呼叫,這裡注意一個,在submitAll實現方法中,我手動的呼叫了初始化執行緒池的方法concurrentThreadPool.initConcurrentThreadPool(),在真實的專案上,應該在應用啟動的時候就呼叫該方法來初始化執行緒池

測試類程式碼
SendMessageService.java,假設這是一個傳送郵件資訊的服務類

package com.mairuan.base.concurrent.test;

public class SendMessageService {
    public void sendMessage(String email,String content){
        System.out.println("傳送郵件。。。");
    }
}

SendMessageHander.java,多執行緒傳送郵件的處理類

package com.mairuan.base.concurrent.test;

import java.util.HashMap;
import java.util.Map;

import com.mairuan.base.concurrent.CallableTemplate;

public class SendMessageHander extends CallableTemplate<Map<String, String>> {

    private String email;
    private String content;
    public SendMessageHander(String email,String content) {
        this.email = email;
        this.content = content;
    }

    @Override
    public Map<String, String> process() {
        SendMessageService sendMessageService = new SendMessageService();
        sendMessageService.sendMessage(email, content);
        Map<String, String> map = new HashMap<String, String>();
        map.put(email, content);
        return map;
    }

}

這個類繼承了上面的CallableTemplate,我們要的返回值是Map,因此泛型型別是Map,在類中還重寫了process方法,在方法中呼叫傳送郵件的業務邏輯介面SendMessageService.sendMessage,並將返回結果組裝成Map返回,這裡我就簡單處理了,將郵件地址及內容放在Map中直接返回了;另外還要注意這個類有個有參構造器,通過構建器可以接收需要傳遞進來的引數

SendMessageTest.java,測試類

package com.mairuan.base.concurrent.test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;

import com.mairuan.base.concurrent.CallableTemplate;
import com.mairuan.base.concurrent.ICallableTaskFrameWork;
import com.mairuan.base.concurrent.impl.CallableTaskFrameWork;

public class SendMessageTest {
    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();
        List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();
        SendMessageHander sendMessageHander = null;
        // 將需要傳送郵件的郵件地址及內容組裝好,放在一個集合中
        for (int i = 0; i < 1000; i++) {
            sendMessageHander = new SendMessageHander("email" + i, "content"
                    + i);

            tasks.add(sendMessageHander);
        }
        //通過多執行緒一次性發起郵件,並拿到返回結果集
        List<Map<String, String>> results = callableTaskFrameWork
                .submitsAll(tasks);
        // 解析返回結果集
        for (Map<String, String> map : results) {
            for (Entry<String, String> entry : map.entrySet()) {
                System.out.println(entry.getKey() + "\t" + entry.getValue());
            }
        }

    }
}

執行結果
這裡寫圖片描述