1. 程式人生 > >java執行緒池管理以及分散式

java執行緒池管理以及分散式

【編者按】多執行緒是程式設計師面試時常常會面對的問題,對多執行緒概念的掌握和理解水平,也常常被用來衡量一個人的程式設計實力。不錯,普通的多執行緒已經不容易了,那麼當多執行緒碰到“大象”又會產生什麼樣的火花?這裡我們為大家分享上海創行科技技術總監嚴瀾的博文——Java執行緒池管理及分散式Hadoop排程框架搭建。

CSDN推薦:歡迎免費訂閱《》獲取更多Hadoop技術文獻、大資料技術分析、企業實戰經驗,生態圈發展趨勢。 

以下為原文: 

平時的開發中執行緒是個少不了的東西,比如tomcat裡的servlet就是執行緒,沒有執行緒我們如何提供多使用者訪問呢?不過很多剛開始接觸執行緒的開發工程師卻在這個上面吃了不少苦頭。怎麼做一套簡便的執行緒開發模式框架讓大家從單執行緒開發快速轉入多執行緒開發,這確實是個比較難搞的工程。

那具體什麼是執行緒呢?首先看看程序是什麼,程序就是系統中執行的一個程式,這個程式可以使用記憶體、處理器、檔案系統等相關資源。例如QQ軟體、Eclipse、Tomcat等就是一個exe程式,執行啟動起來就是一個程序。為什麼需要多執行緒?如果每個程序都是單獨處理一件事情不能多個任務同時處理,比如我們開啟qq只能和一個人聊天,我們用eclipse開發程式碼的時候不能編譯程式碼,我們請求tomcat服務時只能服務一個使用者請求,那我想我們還在原始社會。多執行緒的目的就是讓一個程序能夠同時處理多件事情或者請求。比如現在我們使用的QQ軟體可以同時和多個人聊天,我們用eclipse開發程式碼時還可以編譯程式碼,tomcat可以同時服務多個使用者請求。

執行緒這麼多好處,怎麼把單程序程式變成多執行緒程式呢?不同的語言有不同的實現,這裡說下java語言的實現多執行緒的兩種方式:擴充套件java.lang.Thread類、實現java.lang.Runnable介面。

先看個例子,假設有100個數據需要分發並且計算。看下單執行緒的處理速度: 

  1. package thread;  
  2. import java.util.Vector;  
  3. publicclass OneMain {  
  4.        publicstaticvoid main(String[] args) throws InterruptedException {  
  5.             Vector<Integer> list = new
     Vector<Integer>(100);  
  6.              for (int i = 0; i < 100; i++) {  
  7.                   list.add(i);  
  8.             }  
  9.              long start = System.currentTimeMillis();  
  10.              while (list.size() > 0) {  
  11.                    int val = list.remove(0);  
  12.                   Thread. sleep(100);//模擬處理
  13.                   System. out.println(val);  
  14.             }  
  15.              long end = System.currentTimeMillis();  
  16.             System. out.println("消耗 " + (end - start) + " ms");  
  17.       }  
  18.        // 消耗 10063 ms
  19. }  
再看一下多執行緒的處理速度,採用了10個執行緒分別處理:
  1. package thread;  
  2. import java.util.Vector;  
  3. import java.util.concurrent.CountDownLatch;  
  4. publicclass MultiThread extends Thread {  
  5.      static Vector<Integer> list = new Vector<Integer>(100);  
  6.      static CountDownLatch count = new CountDownLatch(10);  
  7.      publicvoid run() {  
  8.           while (list.size() > 0) {  
  9.                try {  
  10.                     int val = list.remove(0);  
  11.                     System.out.println(val);  
  12.                     Thread.sleep(100);//模擬處理
  13.                } catch (Exception e) {  
  14.                     // 可能陣列越界,這個地方只是為了說明問題,忽略錯誤
  15.                }  
  16.           }  
  17.           count.countDown(); // 刪除成功減一
  18.      }  
  19.      publicstaticvoid main(String[] args) throws InterruptedException {  
  20.           for (int i = 0; i < 100; i++) {  
  21.                list.add(i);  
  22.           }  
  23.           long start = System.currentTimeMillis();  
  24.           for (int i = 0; i < 10; i++) {  
  25.                new MultiThread().start();  
  26.           }  
  27.           count.await();  
  28.           long end = System.currentTimeMillis();  
  29.           System.out.println("消耗 " + (end - start) + " ms");  
  30.      }  
  31.      // 消耗 1001 ms
  32. }  

大家看到了執行緒的好處了吧!單執行緒需要10S,10個執行緒只需要1S。充分利用了系統資源實現平行計算。也許這裡會產生一個誤解,是不是增加的執行緒個數越多效率越高。執行緒越多處理效能越高這個是錯誤的,正規化都要合適,過了就不好了。需要普及一下計算機硬體的一些知識。我們的cpu是個運算器,執行緒執行就需要這個運算器來執行。不過這個資源只有一個,大家就會爭搶。一般通過以下幾種演算法實現爭搶cpu的排程:

  1. 佇列方式,先來先服務。不管是什麼任務來了都要按照佇列排隊先來後到。
  2. 時間片輪轉,這也是最古老的cpu排程演算法。設定一個時間片,每個任務使用cpu的時間不能超過這個時間。如果超過了這個時間就把任務暫停儲存狀態,放到佇列尾部繼續等待執行。
  3. 優先順序方式:給任務設定優先順序,有優先順序的先執行,沒有優先順序的就等待執行。

這三種演算法都有優缺點,實際作業系統是結合多種演算法,保證優先順序的能夠先處理,但是也不能一直處理優先順序的任務。硬體方面為了提高效率也有多核cpu、多執行緒cpu等解決方案。目前看得出來執行緒增多了會帶來cpu排程的負載增加,cpu需要排程大量的執行緒,包括建立執行緒、銷燬執行緒、執行緒是否需要換出cpu、是否需要分配到cpu。這些都是需要消耗系統資源的,由此,我們需要一個機制來統一管理這一堆執行緒資源。執行緒池的理念提出解決了頻繁建立、銷燬執行緒的代價。執行緒池指預先建立好一定大小的執行緒等待隨時服務使用者的任務處理,不必等到使用者需要的時候再去建立。特別是在java開發中,儘量減少垃圾回收機制的消耗就要減少物件的頻繁建立和銷燬。

之前我們都是自己實現的執行緒池,不過隨之jdk1.5的推出,jdk自帶了java.util.concurrent併發開發框架,解決了我們大部分執行緒池框架的重複工作。可以使用Executors來建立執行緒池,列出以下大概的,後面再介紹。

  • newCachedThreadPool建立具有快取功能執行緒池
  • newFixedThreadPool建立固定數量的執行緒
  • newScheduledThreadPool建立具有時間排程的執行緒 

有了執行緒池後有以下幾個問題需要考慮:

  1. 執行緒怎麼管理,比如新建任務執行緒。 
  2. 執行緒如何停止、啟動。 
  3. 執行緒除了scheduled模式的間隔時間定時外能否實現精確時間啟動。比如晚上1點啟動。 
  4. 執行緒如何監控,如果執行緒執行過程中死掉了,異常終止我們怎麼知道。

考慮到這幾點,我們需要把執行緒集中管理起來,用java.util.concurrent是做不到的。需要做以下幾點: 

  1. 將執行緒和業務分離,業務的配置單獨做成一個表。 
  2. 構建基於concurrent的執行緒排程框架,包括可以管理執行緒的狀態、停止執行緒的介面、執行緒存活心跳機制、執行緒異常日誌記錄模組。 
  3. 構建靈活的timer元件,新增quartz定時元件實現精準定時系統。
  4. 和業務配置資訊結合構建執行緒池任務排程系統。可以通過配置管理、新增執行緒任務、監控、定時、管理等操作。 

元件圖為: 

 

構建好執行緒排程框架是不是就可以應對大量計算的需求了呢?答案是否定的。因為一個機器的資源是有限的,上面也提到了cpu是時間週期的,任務一多了也會排隊,就算增加cpu,一個機器能承載的cpu也是有限的。所以需要把整個執行緒池框架做成分散式的任務排程框架才能應對橫向擴充套件,比如一個機器上的資源達到瓶頸了,馬上增加一臺機器部署排程框架和業務就可以增加計算能力了。好了,如何搭建?如下圖:

 

基於jeeframework我們封裝spring、ibatis、資料庫等操作,並且可以呼叫業務方法完成業務處理。主要元件為:

  1. 任務集中儲存到資料庫伺服器
  2. 控制中心負責管理叢集中的節點狀態,任務分發
  3. 執行緒池排程叢集負責控制中心分發的任務執行
  4. web伺服器通過視覺化操作任務的分派、管理、監控。

一般這個架構可以應對常用的分散式處理需求了,不過有個缺陷就是隨著開發人員的增多和業務模型的增多,單執行緒的程式設計模型也會變得複雜。比如需要對1000w資料進行分詞,如果這個放到一個執行緒裡來執行,不算計算時間消耗光是查詢資料庫就需要耗費不少時間。有人說,那我把1000w資料打散放到不同機器去運算,然後再合併不就行了嗎?因為這是個特例的模式,專為了這個需求去開發相應的程式沒有問題,但是以後又有其他的海量需求如何辦?比如把倒退3年的所有使用者發的帖子中發帖子最多的粉絲轉發的最高的使用者作息時間取出來。又得編一套程式實現,太麻煩!分散式雲端計算架構要解決的就是這些問題,減少開發複雜度並且要高效能,大家會不會想到一個最近很熱的一個框架,hadoop,沒錯就是這個玩意。hadoop解決的就是這個問題,把大的計算任務分解、計算、合併,這不就是我們要的東西嗎?不過玩過這個的人都知道他是一個單獨的程序。不是!他是一堆程序,怎麼和我們的排程框架結合起來?看圖說話:

基本前面的分散式排程框架元件不變,增加如下元件和功能:

  • 改造分散式排程框架,可以把本身執行緒任務變成mapreduce任務並提交到hadoop叢集。
  • hadoop叢集能夠呼叫業務介面的spring、ibatis處理業務邏輯訪問資料庫。
  • hadoop需要的資料能夠通過hive查詢。
  • hadoop可以訪問hdfs/hbase讀寫操作。
  • 業務資料要及時加入hive倉庫。
  • hive處理離線型資料、hbase處理經常更新的資料、hdfs是hive和hbase的底層結構也可以存放常規檔案。

這樣,整個改造基本完成。不過需要注意的是架構設計一定要減少開發程式的複雜度。這裡雖然引入了hadoop模型,但是框架上開發者還是隱藏的。業務處理類既可以在單機模式下執行也可以在hadoop上執行,並且可以呼叫spring、ibatis。減少了開發的學習成本,在實戰中慢慢體會就學會了 一項新技能。

介面截圖: