java 多執行緒 ThreadPoolExecutor 接收並處理資料
2015年03月19日 15:22:08
1. 一般的網際網路專案,都涉及多資料的處理,這個是再常見不過的事情了,如果是但執行緒去對資料做處理,明顯效能上是慢了很多,那麼有沒有什麼好的方式吶? 當然有,這就是java本身的多執行緒機制對應java 多執行緒的問題,有一大堆的demo去做參考,在jdk的的 java.util.concurrent 包下,提供了很多的可以使用的api ,不再類述了。。。 2. 主要用的到是ThreadPoolExecutor:常用構造方法為: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize: 執行緒池維護執行緒的最少數量 maximumPoolSize:執行緒池維護執行緒的最大數量 keepAliveTime: 執行緒池維護執行緒所允許的空閒時間 unit: 執行緒池維護執行緒所允許的空閒時間的單位 workQueue: 執行緒池所使用的緩衝佇列 handler: 執行緒池對拒絕任務的處理策略
一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個 Runnable型別的物件,任務的執行方法就是 Runnable型別物件的run()方法。
當一個任務通過execute(Runnable)方法欲新增到執行緒池時:
如果此時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。 如果此時執行緒池中的數量等於 corePoolSize,但是緩衝佇列 workQueue未滿,那麼任務被放入緩衝佇列。 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被新增的任務。 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。
也就是:處理任務的優先順序為: 核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。
unit可選的引數為java.util.concurrent.TimeUnit中的幾個靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四個選擇: ThreadPoolExecutor.AbortPolicy() 丟擲java.util.concurrent.RejectedExecutionException異常 ThreadPoolExecutor.CallerRunsPolicy() 重試添加當前的任務,他會自動重複呼叫execute()方法 ThreadPoolExecutor.DiscardOldestPolicy() 拋棄舊的任務 ThreadPoolExecutor.DiscardPolicy() 拋棄當前的任務
3. 最近在做一個專案,將這個使用記一下,方便發現問題和後期做修改。其他不說了,主要看程式碼: public class AcceptServer implements Runnable {
public final Logger logger = Logger.getLogger(AcceptServer.class);
//服務端,用來接收來自c程式傳送的位元組碼資料.
private ServerSocket server = null;
//執行緒池 public static ThreadPoolExecutor executor = null;
/** * 初始化方法,工程啟動時呼叫 */ public void initRevc(){ try { executor = new ThreadPoolExecutor(200,300,1,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(300), new ThreadPoolExecutor.CallerRunsPolicy() ); logger.debug("PersonaeServer start to run"); server = new ServerSocket(Integer.parseInt(getOperatePort("ServerPort","25105"))); AcceptServer recv = new AcceptServer(); recv.setServer(server); Thread thread = new Thread(recv); thread.start(); } catch (IOException e) { logger.error(e.getMessage()); } catch (Exception ex) { logger.error(ex.getMessage()); } }
public void run() { Socket client = null; while(true){ try { logger.debug("PersonaeServer befor server.accept()"); client = this.getServer().accept(); client.setSoTimeout(60000*10); logger.debug("PersonaeServer after server.accept()"); //對流檔案做處理的入口方式 QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client); } catch (IOException e) { logger.error(e.getMessage()); } catch (Exception ex) { logger.error(ex.getMessage()); } } }
/** * 讀配置檔案取接收埠 * @param property * @param defaultValue * @return */ private String getOperatePort(String property,String defaultValue){ Properties prop = new Properties(); InputStream fis = getClass().getClassLoader().getResourceAsStream("config.properties"); try { prop.load(fis); } catch (IOException e) { logger.error(e.getMessage()); } String value = prop.getProperty(property,defaultValue).trim(); logger.debug(property+" value=="+value); return value; }
public ServerSocket getServer() { return server; }
public void setServer(ServerSocket server) { this.server = server; } } 通過這個框架程式碼,就可以用來在服務端處理客戶端發過來的請求,並做出相應的相應。 QueryReader.parseByteReader(new BufferedInputStream(client.getInputStream()), client); 主要是對資料做處理的,並返回相應的成功程式碼的。
4. 部署注意點: 1). 此方法採用動態讀取ip和埠的方式來獲取客戶端的流資訊,所以要動態的配置config.properties檔案中的ip和埠; 2). 採用spring的ben工程來初始化,將該執行緒放入beanFactory中去: <bean class="com.AcceptServer " init-method="initRevc">
5. 在使用多執行緒處理問題的時候,可以根據自己的習慣,採用java.util.concurrent 包下提供的api 來做對資料,對業務的處理工作。 --------------------- 作者:supingemail 來源:CSDN 原文:https://blog.csdn.net/supingemail/article/details/44458451 版權宣告:本文為博主原創文章,轉載請附上博文連結!