1. 程式人生 > >java 多執行緒 ThreadPoolExecutor 接收並處理資料

java 多執行緒 ThreadPoolExecutor 接收並處理資料

2015年03月19日 15:22:08

1.   一般的網際網路專案,都涉及多資料的處理,這個是再常見不過的事情了,如果是但執行緒去對資料做處理,明顯效能上是慢了很多,那麼有沒有什麼好的方式吶?  當然有,這就是java本身的多執行緒機制對應java 多執行緒的問題,有一大堆的demo去做參考,在jdk的的 java.util.concurrent 包下,提供了很多的可以使用的api ,不再類述了。。。                2. 主要用的到是ThreadPoolExecutor:常用構造方法為: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize: 執行緒池維護執行緒的最少數量 maximumPoolSize:執行緒池維護執行緒的最大數量 keepAliveTime: 執行緒池維護執行緒所允許的空閒時間 unit: 執行緒池維護執行緒所允許的空閒時間的單位 workQueue: 執行緒池所使用的緩衝佇列 handler: 執行緒池對拒絕任務的處理策略

一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個 Runnable型別的物件,任務的執行方法就是 Runnable型別物件的run()方法。

當一個任務通過execute(Runnable)方法欲新增到執行緒池時:

如果此時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。 如果此時執行緒池中的數量等於 corePoolSize,但是緩衝佇列 workQueue未滿,那麼任務被放入緩衝佇列。 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被新增的任務。 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。

也就是:處理任務的優先順序為: 核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。

當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。

unit可選的引數為java.util.concurrent.TimeUnit中的幾個靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四個選擇: ThreadPoolExecutor.AbortPolicy() 丟擲java.util.concurrent.RejectedExecutionException異常 ThreadPoolExecutor.CallerRunsPolicy() 重試添加當前的任務,他會自動重複呼叫execute()方法 ThreadPoolExecutor.DiscardOldestPolicy() 拋棄舊的任務 ThreadPoolExecutor.DiscardPolicy() 拋棄當前的任務  

     3. 最近在做一個專案,將這個使用記一下,方便發現問題和後期做修改。其他不說了,主要看程式碼: public class AcceptServer implements Runnable {

public final Logger logger = Logger.getLogger(AcceptServer.class);

//服務端,用來接收來自c程式傳送的位元組碼資料.

private ServerSocket server = null;  

//執行緒池 public static ThreadPoolExecutor executor = null;

/** * 初始化方法,工程啟動時呼叫 */ public  void initRevc(){  try {  executor = new ThreadPoolExecutor(200,300,1,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(300),  new ThreadPoolExecutor.CallerRunsPolicy()  );  logger.debug("PersonaeServer start to run");    server = new ServerSocket(Integer.parseInt(getOperatePort("ServerPort","25105")));  AcceptServer recv = new AcceptServer();  recv.setServer(server);  Thread thread = new Thread(recv);  thread.start();  } catch (IOException e) { logger.error(e.getMessage()); } catch (Exception ex) { logger.error(ex.getMessage()); } }

public void run() { Socket client = null; while(true){ try { logger.debug("PersonaeServer befor server.accept()");    client = this.getServer().accept();    client.setSoTimeout(60000*10);    logger.debug("PersonaeServer after server.accept()");    //對流檔案做處理的入口方式    QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client);     } catch (IOException e) { logger.error(e.getMessage()); } catch (Exception ex) { logger.error(ex.getMessage());  }           } }

/**  * 讀配置檔案取接收埠 * @param property * @param defaultValue * @return */ private String getOperatePort(String property,String defaultValue){ Properties prop = new Properties(); InputStream fis =  getClass().getClassLoader().getResourceAsStream("config.properties"); try { prop.load(fis); } catch (IOException e) { logger.error(e.getMessage()); } String value = prop.getProperty(property,defaultValue).trim(); logger.debug(property+" value=="+value);    return value; }

public ServerSocket getServer() { return server; }

public void setServer(ServerSocket server) { this.server = server; } }        通過這個框架程式碼,就可以用來在服務端處理客戶端發過來的請求,並做出相應的相應。 QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client); 主要是對資料做處理的,並返回相應的成功程式碼的。

4. 部署注意點:      1). 此方法採用動態讀取ip和埠的方式來獲取客戶端的流資訊,所以要動態的配置config.properties檔案中的ip和埠;      2). 採用spring的ben工程來初始化,將該執行緒放入beanFactory中去:      <bean class="com.AcceptServer " init-method="initRevc">  

5. 在使用多執行緒處理問題的時候,可以根據自己的習慣,採用java.util.concurrent 包下提供的api 來做對資料,對業務的處理工作。 ---------------------  作者:supingemail  來源:CSDN  原文:https://blog.csdn.net/supingemail/article/details/44458451  版權宣告:本文為博主原創文章,轉載請附上博文連結!