1. 程式人生 > >Java多執行緒應用例項

Java多執行緒應用例項

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
	//最大執行緒數
	private static final int MaxWorkerNumber=10;
	//預設執行緒數
	private static final int DefaultWorkerNumber=5;
	//最小執行緒數
	private static final int MinWorkerNumber=1;
	//這是一個任務佇列,將會向裡面插入任務
	private final LinkedList<Job> jobs=new LinkedList<Job>();
	//工作者列表
	private final List<Worker> workers=
			Collections.synchronizedList(new ArrayList<Worker>());
	//工作者執行緒的數量
	private int workerNum=DefaultWorkerNumber;
	//執行緒編號生成
	private AtomicLong threadNum=new AtomicLong();
	public DefaultThreadPool(){
		initializeWorker(DefaultWorkerNumber);
	}
	public DefaultThreadPool(int num){
		workerNum=num>MaxWorkerNumber?MaxWorkerNumber:
			num<MinWorkerNumber?MinWorkerNumber:num;
		initializeWorker(workerNum);
	}
	//初始化工作者
	private void initializeWorker(int num){
		for(int i=0;i<num;i++){
			Worker worker=new Worker();
			workers.add(worker);
			Thread thread=new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet());
			System.out.println("新增工作者執行緒:ThreadPool-Worker-"+threadNum.get());
			thread.start();
		}
	}
	@Override
	public void execute(Job job) {
		if(job!=null){
			//新增一個任務,然後進行通知
			synchronized(jobs){
				jobs.addLast(job);
				jobs.notifyAll();
				System.out.println("提交任務:"+job.toString());
			}
		} 
	}

	@Override
	public void shutDown() {
		for(Worker worker:workers){
			worker.shutdown();
		}  
		System.err.println(" 關閉執行緒池! ");
		//注意這裡需要通知等待佇列中的工作者執行緒
		synchronized(jobs){
			jobs.notifyAll();
		} 
	}

	@Override
	public void addWorkers(int num) {
		//限定新增的Worder數量不能超過最大值
		if(num+this.workerNum>MaxWorkerNumber){
			num=MaxWorkerNumber-this.workerNum;
		}
		synchronized(workers){
			initializeWorker(num);
			this.workerNum+=num;
		} 
	}

	@Override
	public void removeWorker(int num) {
		if(num>=this.workerNum){
			throw new IllegalArgumentException("beyond workNum");
		}
		synchronized(workers){
			//按照給定的數量停止Worker
			int count=num;
			while(count>0){
				Worker worker=workers.get(0);
				if(workers.remove(worker)){
					worker.shutdown();
					count--;
				}
			}
			this.workerNum-=num;
		}
	}

	@Override
	public int getJobSize() { 
		return jobs.size();
	} 
	
	//工作者執行緒消費任務
	public class Worker implements Runnable{
		//是否工作
		private volatile boolean running=true;
		@Override
		public void run() {
			while(true){
				Job job=null;
				synchronized(jobs){
					//如果任務佇列是空的,那麼就wait
					while(jobs.isEmpty()){
						if(running==false){
							System.out.println(Thread.currentThread().getName()+"結束");
							return;
						}
						try {
							jobs.wait();
						} catch (InterruptedException e) { 
							//感知外部對WorkerThread的中斷操作,返回
							Thread.currentThread().interrupt();
							return; 
						}
					} 
					if(running==false){
						System.out.println(Thread.currentThread().getName()+"結束");
						return;
					}
					//取出一個Job
					job=jobs.removeFirst(); 
				}
				if(job!=null){
					try{
						System.out.println(Thread.currentThread().getName()+" 執行任務 "+job.toString());
						job.run();
					}catch(Exception e){
						//忽略Job執行中的Exception
					} 
				}
			} 
			
		} 
		//關閉當前工作者
		public void shutdown(){
			running=false;
			System.err.println("關閉一個工作者執行緒");
		}
	}
}
public class ThreadLocalTest {
    //建立一個Integer型的執行緒本地變數
	public static final ThreadLocal<Integer> local = new ThreadLocal<Integer>() {
		@Override
		protected Integer initialValue() {
			return 0;
		}
	};
	//計數
	static class Counter implements Runnable{
		@Override
		public void run() {
            //獲取當前執行緒的本地變數,然後累加5次
			int num = local.get();
			for (int i = 0; i < 100; i++) {
				num++;
			}
            //重新設定累加後的本地變數
			local.set(num);
			System.out.println(Thread.currentThread().getName() + " : "+ local.get());
		}
	}
	public static void main(String[] args) throws InterruptedException {
		Thread[] threads = new Thread[5];
		for (int i = 0; i < 5; i++) {		
            threads[i] = new Thread(new Counter() ,"CounterThread-[" + i+"]");
            threads[i].start();
		} 
	}
}