1. 程式人生 > >一個取消多生產者單消費者的日誌執行緒池服務

一個取消多生產者單消費者的日誌執行緒池服務

package concurrent._ThreadPool.logService;

import net.jcip.annotations.GuardedBy;
import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.
*; //一個日誌的服務,多個生產者,一個消費者 public class Demo { public static void main(String[] args) throws Exception { MyLogService myLogService = new MyLogService(8,5); myLogService.start(); Thread.sleep(5000); System.out.println("請求停止執行緒池"); myLogService.stop(); } }
class MyLogService{ //執行緒池的數量 private int serviceCount; //阻塞佇列的數量 private int blockingqueueCount; //日誌服務執行緒池 public static ExecutorService executorService; //日誌服務需要的阻塞佇列 private BlockingQueue<String> bq = null; //使用者取消而設定的標誌位 //GuardedBy表示要取得括號裡面的這個鎖才能訪問這個屬性 @GuardedBy("
this") private volatile boolean iscancel; //用於記錄,put與take次數的差,如果取消標誌位設定為真,次數差到0說明,put的操作都已經被消費了,這時候說明沒有put操作被阻塞 // 可以將cum執行緒終止,所有執行緒都成功停下來了 @GuardedBy("this") private int reservations; //消費者; private FutureTask<Integer> cum = null; //執行緒池預設數量 private final static int DEFAULT_SERVICE_COUNT = 8; private final static int DEFAULT_QUEUE_COUNT = 4; public MyLogService(int serviceCount , int blockingqueueCount){ this.serviceCount = serviceCount; this.blockingqueueCount = serviceCount; this.iscancel = false; this.reservations = 0; executorService = Executors.newFixedThreadPool(serviceCount); bq = new ArrayBlockingQueue<String>(blockingqueueCount); } public MyLogService(){ this(DEFAULT_SERVICE_COUNT,DEFAULT_QUEUE_COUNT); } //執行緒池開始工作 public void start(){ this.cum = new FutureTask<Integer>(this.CumLogfactory()); //啟動cum executorService.submit(cum); List<FutureTask<String>> list = new ArrayList<FutureTask<String>>(); for(int i = 0 ;i < this.serviceCount ; i ++){ FutureTask<String> futureTask = new FutureTask<String>(this.ProLogFactory()); list.add(futureTask); } for(FutureTask<String> f:list){ executorService.submit(f); } System.out.println("執行緒池啟動工作"); } //執行緒池停止工作 public void stop() throws ExecutionException, InterruptedException { synchronized (this){ iscancel = true; } Integer result = cum.get(); if(result == 1){ System.out.println("所有執行緒已經停止"); executorService.shutdown(); } } private ProLog ProLogFactory(){ return new ProLog(); } private CumLog CumLogfactory(){ return new CumLog(); } private class ProLog implements Callable<String>{ private final String name ; //預設名字 private final static String DEFAULT_NAME = "proLog"; public ProLog(){ this(DEFAULT_NAME); } public ProLog(String name){ this.name = name; } @Override public String call() throws Exception { String thradname = Thread.currentThread().getName(); proLog(thradname); return null; } private void proLog(String name) throws InterruptedException { String msg ; while(true){ synchronized (MyLogService.this){ if(iscancel){ System.out.println(name +"發現請求停止,丟擲異常"); throw new IllegalStateException(); } ++reservations; } msg = createLog(); //將日誌寫入阻塞佇列中 bq.put(msg); System.out.println(name + "寫入到佇列中" + msg + "__佇列剩餘空位_" + bq.remainingCapacity()); } } private String createLog() throws InterruptedException { //建立一個日誌 long time = System.currentTimeMillis(); String msg = Long.toString(time); //模擬等待一段時間 Thread.sleep(new Random().nextInt(5000)); return msg; } } private class CumLog implements Callable<Integer>{ private final String name ; //預設名字 private static final String DEFAULT_NAME = "cumLog"; public CumLog(){ this(DEFAULT_NAME); } public CumLog(String name){ this.name = name; } @Override public Integer call() throws Exception { System.out.println("啟動一個日誌cum"); Integer msg = cumLog(); return msg; } private Integer cumLog() throws InterruptedException, IOException { String msg = null; Integer isdone = 0; //獲取日誌 while(true){ Thread.sleep(300); synchronized (MyLogService.this){ if(iscancel && reservations==0){ isdone = 1; break; } } msg = bq.take(); synchronized (MyLogService.this){ --reservations; } System.out.println("消費了一個日誌"+msg + "__佇列剩餘空位_" + bq.remainingCapacity()); } return isdone; } } }

結果:

啟動一個日誌cum
執行緒池啟動工作
pool-1-thread-7寫入到佇列中1543640100705__佇列剩餘空位_4
消費了一個日誌1543640100705__佇列剩餘空位_5
pool-1-thread-5寫入到佇列中1543640100705__佇列剩餘空位_4
消費了一個日誌1543640100705__佇列剩餘空位_5
pool-1-thread-5寫入到佇列中1543640103668__佇列剩餘空位_4
消費了一個日誌1543640103668__佇列剩餘空位_5
pool-1-thread-8寫入到佇列中1543640100705__佇列剩餘空位_4
pool-1-thread-6寫入到佇列中1543640100705__佇列剩餘空位_3
pool-1-thread-2寫入到佇列中1543640100705__佇列剩餘空位_2
消費了一個日誌1543640100705__佇列剩餘空位_3
消費了一個日誌1543640100705__佇列剩餘空位_4
pool-1-thread-4寫入到佇列中1543640100705__佇列剩餘空位_3
pool-1-thread-6寫入到佇列中1543640105030__佇列剩餘空位_2
pool-1-thread-3寫入到佇列中1543640100705__佇列剩餘空位_1
消費了一個日誌1543640100705__佇列剩餘空位_2
請求停止執行緒池
pool-1-thread-7寫入到佇列中1543640100867__佇列剩餘空位_1
pool-1-thread-7發現請求停止,丟擲異常
pool-1-thread-7發現請求停止,丟擲異常
pool-1-thread-3寫入到佇列中1543640105582__佇列剩餘空位_0
pool-1-thread-3發現請求停止,丟擲異常
消費了一個日誌1543640100705__佇列剩餘空位_1
消費了一個日誌1543640105030__佇列剩餘空位_2
消費了一個日誌1543640100705__佇列剩餘空位_3
pool-1-thread-8寫入到佇列中1543640104835__佇列剩餘空位_2
pool-1-thread-8發現請求停止,丟擲異常
消費了一個日誌1543640100867__佇列剩餘空位_3
pool-1-thread-2寫入到佇列中1543640105042__佇列剩餘空位_2
pool-1-thread-2發現請求停止,丟擲異常
消費了一個日誌1543640105582__佇列剩餘空位_3
消費了一個日誌1543640104835__佇列剩餘空位_4
消費了一個日誌1543640105042__佇列剩餘空位_5
pool-1-thread-6寫入到佇列中1543640105573__佇列剩餘空位_4
消費了一個日誌1543640105573__佇列剩餘空位_5
pool-1-thread-6發現請求停止,丟擲異常
消費了一個日誌1543640104777__佇列剩餘空位_5
pool-1-thread-5寫入到佇列中1543640104777__佇列剩餘空位_5
pool-1-thread-5發現請求停止,丟擲異常
pool-1-thread-4寫入到佇列中1543640105562__佇列剩餘空位_4
pool-1-thread-4發現請求停止,丟擲異常
消費了一個日誌1543640105562__佇列剩餘空位_5
所有執行緒已經停止