1. 程式人生 > >1.1 Spring 執行緒池 --- ThreadPoolTaskExecutor

1.1 Spring 執行緒池 --- ThreadPoolTaskExecutor

Spring 擅長對元件的封裝和整合, Spring-context對JDK的併發包做了功能增強。 

step 1 :Spring-context.xml 中增加如下程式碼

<bean id="poolTaskExecutor"  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
   <!-- 核心執行緒數,預設為1 -->
   <property name="corePoolSize" value="5" />
   <!-- 最大執行緒數,預設為Integer.MAX_VALUE -->
   <property name="maxPoolSize" value="50" />
   <!-- 佇列最大長度,一般需要設定值>=notifyScheduledMainExecutor.maxNum;預設為Integer.MAX_VALUE -->
   <property name="queueCapacity" value="2000" />
   <!-- 執行緒池維護執行緒所允許的空閒時間,預設為60s -->
   <property name="keepAliveSeconds" value="100" />
   <!-- 執行緒池對拒絕任務(無執行緒可用)的處理策略,目前只支援AbortPolicy、CallerRunsPolicy;預設為後者 -->
   <property name="rejectedExecutionHandler">
       <!-- AbortPolicy:直接丟擲java.util.concurrent.RejectedExecutionException異常 -->
       <!-- CallerRunsPolicy:主執行緒直接執行該任務,執行完之後嘗試新增下一個任務到執行緒池中,可以有效降低向執行緒池內新增任務的速度 -->
       <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支援;會導致被丟棄的任務無法再次被執行 -->
       <!-- DiscardPolicy:拋棄當前任務、暫不支援;會導致被丟棄的任務無法再次被執行 -->
       <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
   </property>
</bean>


step 2 :執行緒池使用

申明和使用

程式碼

	@Autowired
	@Qualifier(value = "poolTaskExecutor")
	private ThreadPoolTaskExecutor threadPoolTaskExecutor;

	@RequestMapping(value = "Info", method = RequestMethod.POST)
	public String passerbyInfo(@RequestBody String json) {

		threadPoolTaskExecutor.execute(new Runnable() {
			@Override
			public void run() {
				log.info("-----開啟執行緒-----");
				// Todo 
			}
		});


		return json;
	}

以上asyncTaskExecutor,你可以注入到任何一個bean去執行,底層使用JDK的ThreadPoolTaskExecutor來管理執行緒,預設使用的是JDK的執行緒池. 
以上只是簡單的應用,非常方便的開發,我們都不用去處理執行緒池的初始化,以及執行緒的管理。 


功能 1 :Spring 對執行緒的監聽--成功 / 失敗

ListenableFutureTask<String> f1 = (ListenableFutureTask<String>) asyncTaskExecutor.submitListenable(c1);  
f1.addCallback(new ListenableFutureCallback<String>() {  
  
            @Override  
            public void onSuccess(String result) {  
                //成功時 TODO  
            }  
  
            @Override  
            public void onFailure(Throwable t) {  
                // 失敗時 TODO Auto-generated method stub  
                t.printStackTrace();  
            }  
              
           }); 



功能 2 :擴充套件FutureTask的protected方法 

package org.springframework.util.concurrent;  
  
import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.FutureTask;  
  
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {  
  
    private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();  
  
  
    /** 
     * Create a new {@code ListenableFutureTask} that will, upon running, 
     * execute the given {@link Callable}. 
     * @param callable the callable task 
     */  
    public ListenableFutureTask(Callable<T> callable) {  
        super(callable);  
    }  
  
    /** 
     * Create a {@code ListenableFutureTask} that will, upon running, 
     * execute the given {@link Runnable}, and arrange that {@link #get()} 
     * will return the given result on successful completion. 
     * @param runnable the runnable task 
     * @param result the result to return on successful completion 
     */  
    public ListenableFutureTask(Runnable runnable, T result) {  
        super(runnable, result);  
    }  
  
  
    @Override  
    public void addCallback(ListenableFutureCallback<? super T> callback) {  
        this.callbacks.addCallback(callback);  
    }  
  
    @Override  
    protected final void done() {  
        Throwable cause;  
        try {  
            T result = get();  
            this.callbacks.success(result);  
            return;  
        }  
        catch (InterruptedException ex) {  
            Thread.currentThread().interrupt();  
            return;  
        }  
        catch (ExecutionException ex) {  
            cause = ex.getCause();  
            if (cause == null) {  
                cause = ex;  
            }  
        }  
        catch (Throwable ex) {  
            cause = ex;  
        }  
        this.callbacks.failure(cause);  
    }  
  
}