1. 程式人生 > >第八章、線程池的使用

第八章、線程池的使用

else 增加 void actor package class 全局 link catch

線程工廠的使用:

  在創建線程時,應該要初始化它的線程名稱,以便以後更好的查找錯誤,下面的示例展示了線程工廠的使用,創建線程是並發的,因此count使用原子類。

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class MyThreadFactory implements ThreadFactory{
    private final String poolName;
    
private AtomicInteger count=new AtomicInteger(0); public MyThreadFactory(String poolName) { this.poolName=poolName; System.out.println("Thread name is "+poolName); } @Override public Thread newThread(Runnable r) { Thread t=new Thread(r); t.setName(poolName
+"-"+count.incrementAndGet()); return t; } }

  測試類:

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService exec=Executors.newCachedThreadPool(new
MyThreadFactory("thread")); exec.submit(new Thread1()); exec.submit(new Thread1()); exec.shutdown(); } } class Thread1 implements Runnable{ public void run(){ System.out.println("the thread is running!"); System.out.println("my name is "+Thread.currentThread().getName()); } }

擴展ThreadPoolExecutor

  下面演示使用給線程增加日誌和計時功能,startTime使用ThreadLocal是因為全局變量,可以發現,在這裏的全局變量要保證線程安全:

package com.company;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime=new ThreadLocal<Long>();
    private final AtomicLong numTasks=new AtomicLong();
    private final AtomicLong totalTime=new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

    @Override
    protected void beforeExecute(Thread t, Runnable r){
        startTime.set(System.nanoTime());
        super.beforeExecute(t, r);
        System.out.println("在線程執行之前打印!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime=System.nanoTime();
        long taskTime=endTime-startTime.get();
        numTasks.incrementAndGet();
        totalTime.addAndGet(taskTime);
        System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread().getName(),endTime,taskTime));

        System.out.println("在線程執行之後打印!");
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("在線程池關閉後打印!");
    }
}

遞歸算法的並行化

  這裏要比較多線程和單線程遞歸地處理某個文件夾下全部文件的時間快慢。唯一不方便的是使用多線程處理時的時間統計,由於不知道提交了多少個處理線程,因此不能及時地關閉線程池(如果有大神,請指點一二),使用等待固定時間的方式,讓線程池最終關閉。這裏使用TimingThreadPool這個類來統計各個線程的執行時間,並每個線程執行完後將時間統計到totalTime中,在線程池關閉後再輸出總時間。

package com.company;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime=new ThreadLocal<Long>();
    private final AtomicLong numTasks=new AtomicLong();
    private final AtomicLong totalTime=new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

    @Override
    protected void beforeExecute(Thread t, Runnable r){
        startTime.set(System.nanoTime());
        super.beforeExecute(t, r);
        //System.out.println("在線程執行之前打印!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime=System.nanoTime();
        long taskTime=endTime-startTime.get();
        numTasks.incrementAndGet();
        totalTime.addAndGet(taskTime);

        //System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread().getName(),endTime,taskTime));
        //System.out.println("在線程執行之後打印!");
        //System.out.println("線程總運行時間為:"+totalTime.get()/1000000000+"秒");
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("在線程池關閉後打印!總共用時:"+totalTime.get()/1000000000+"秒");
    }
}

  主方法,其中Thread1是使用多線程來執行,可以修改TimingThreadPool的初始狀態可以調整線程數量。Thread2是使用單線程處理。在代碼中,我假設處理文件的時間都是1毫秒(單單遍歷文件時,多線程要比單線程的慢,但加入一些處理之後,多線程就占優勢了)。可以看到我的測試數據有580259個文件,使用單線程時一共用時818秒,而使用3個線程時用時798秒,4線程416秒,2線程之類的846秒。因此最合適的是3線程也就快那麽20秒,但是如果處理文件不僅僅是1毫秒,而是更多,那麽這兩者的差距會更大。

package com.company;

import java.io.File;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class Main {
    public final AtomicLong atom=new AtomicLong(0);
    public static void main(String[] args) {
        ExecutorService exec=Executors.newFixedThreadPool(1);
        //ThreadPoolExecutor pool=new TimingThreadPool(1,1,2,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),new MyThreadFactory("findThread"));
        exec.submit(new Thread1());
        exec.shutdown();
    }

    //單線程遍歷某個文件夾
    public void getAllFiles(File file){
        if(file.isDirectory()) {
            for (File f:file.listFiles()){
                getAllFiles(f);
            }
        }else {
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
            atom.incrementAndGet();
        }
    }

    //多線程遍歷某個文件夾
    public void parallelRecursive(Executor exec,File file){
        if(file.isDirectory()) {
            for (File f:file.listFiles()){
                exec.execute(new Thread(){
                    public void run(){
                        parallelRecursive(exec,f);
                    }
                });
            }
        }else {
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
            atom.incrementAndGet();
        }
    }

}
class Thread1 implements Runnable{
    public void run(){
        Main m=new Main();
        File file=new File("/Users/chenkaiwei");

        ThreadPoolExecutor pool=new TimingThreadPool(2,2,5,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),new MyThreadFactory("findThread"));


        m.parallelRecursive(pool,file);
        try{
            Thread.sleep(818000);
            pool.shutdown();
            System.out.println("共訪問【"+m.atom.get()+"】個文件");
            //System.out.println("線程池退出");
            //exec.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //798秒 3線程
    //816秒 4線程
}

class Thread2 implements Runnable{
    public void run(){
        Main m=new Main();
        File file=new File("/Users/chenkaiwei");
        long startTime=System.currentTimeMillis();
        m.getAllFiles(file);
        long endTime=System.currentTimeMillis();
        System.out.println("總共用時:"+(endTime-startTime)/1000+"秒");
        System.out.println("共訪問【"+m.atom.get()+"】個文件");
    }
    //818秒
}
//580259

第八章、線程池的使用