1. 程式人生 > >如何設計一個實時流計算系統

如何設計一個實時流計算系統

實時流計算的場景歸納起來多半是: 
業務系統根據實時的操作,不斷生成事件(訊息/呼叫),然後引起一系列的處理分析,這個過程是分散在多臺計算機上並行完成的,看上去就像事件連續不斷的流經多個計算節點處理,形成一個實時流計算系統。
市場上流計算產品有很多,主要是通過訊息中樞結合工人模式實現,大致過程如下: 
1、開發者實現好流程輸入輸出節點邏輯,上傳job到任務生產者 
2、任務生產者將任務傳送到zookeeper,然後監控任務狀態 
3、任務消費者從zookeeper上獲取任務 
4、任務消費者啟動多個工人程序,每個程序又啟動多個執行緒執行任務 
5、工人之間通過zeroMQ互動 

我們看看如何做一個簡單的流計算系統,做法跟上面有些不同: 


1、首先不過多依賴zookeerper,任務的分配最好直接給到工人,並能直接監控工人完成狀態,這樣效率會更高。 
2、工人之間直接通訊,不依賴zeroMQ轉發。 
3、並行管理扁平化,多程序下再分多執行緒意義不大,增加管理成本,實際上一臺機器8個程序,每個程序再開8個執行緒,總體跟8-10個程序或者執行緒的效果差不多(數量視機器效能不同)。 
4、做成一個流計算系統,而不是平臺。 

這裡我們藉助fourinone提供的api和框架去實現,第一次使用可以參考 分散式計算上手demo指南 ,開發包下載地址 http://code.google.com/p/fourinone/
大致思路:用工頭去做任務生產和分配,用工人去做任務執行,為了達到流的效果,需要在工人裡面呼叫工頭的方式,將多個工人節點串起來。

下面程式演示了連續多個訊息先發到一個工人節點A處理,然後再發到兩個工人節點B並行處理的流計算過程,並且獲取到最後處理結果列印輸出(如果不需要獲取結果可以直接返回)。 

StreamCtorA:工頭A實現,它獲取到線上工人A,然後將訊息發給它處理,並輪循等待結果。工頭A的main函式模擬了多個訊息的連續呼叫。 

StreamWorkerA:工人A實現,它接收到工頭A的訊息進行處理,然後建立一個工頭B,通過工頭B將結果同時發給兩個工人B處理,然後將結果返回工頭A。 

StreamCtorB:工頭B實現,它獲取到線上兩個工人B,呼叫doTaskBatch等待兩個工人處理完成,然後返回結果給工人A。 

StreamWorkerB:工人B實現,它接收到任務訊息後模擬處理後返回結果。 


執行步驟(在本地模擬): 
1、啟動ParkServerDemo(它的IP埠已經在配置檔案指定) 
java -cp fourinone.jar; ParkServerDemo 

2、啟動工人A 
java  -cp fourinone.jar; StreamWorkerA localhost 2008 

3、啟動兩個工人B 
java  -cp fourinone.jar; StreamWorkerB localhost 2009 
java  -cp fourinone.jar; StreamWorkerB localhost 2010 

4、啟動工頭A 
java  -cp fourinone.jar; StreamCtorA 

多機部署說明:StreamCtorA可以單獨部署一臺機器,StreamWorkerA和StreamCtorB部署一臺機器,兩個StreamWorkerB可以部署兩臺機器。 

總結:計算平臺和計算系統的區別 
如果我們只有幾臺機器,但是每天有人開發不同的流處理應用要在這幾臺機器上執行,我們需要一個計算平臺來管理好job,讓開發者按照規範配置好流程和執行時節點申請,打包成job上傳,然後平臺根據每個job配置動態分配資源依次執行每個job內容。 
如果我們的幾臺機器只為一個流處理業務服務,比如實時營銷,我們需要一個流計算系統,按照業務流程部署好計算節點即可,不需要執行多個job和動態分配資源,按照計算平臺的方式做只會增加複雜性,開發者也不清楚每臺機器上到底運行了什麼邏輯。 
如果你想實現一個計算平臺,可以參考 動態部署 和程序管理功能(開發包內有指南)

//完整原始碼 
// ParkServerDemo 

import com.fourinone.BeanContext;
public class ParkServerDemo
{
public static void main(String[] args)
{
BeanContext.startPark();
}
}


//StreamCtorA 

import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;

public class StreamCtorA extends Contractor
{
 public WareHouse giveTask(WareHouse inhouse)
 {
  WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA");
  System.out.println("wks.length:"+wks.length);
  
  WareHouse result = wks[0].doTask(inhouse);
  while(true){
   if(result.getStatus()!=WareHouse.NOTREADY)
   {
    break;
   }
  }
  return result;
 }
 
 public static void main(String[] args)
 {
  StreamCtorA sc = new StreamCtorA();
  for(int i=0;i<10;i++){
    WareHouse msg = new WareHouse();
    msg.put("msg","hello"+i);
    WareHouse wh = sc.giveTask(msg);
    System.out.println(wh);
  }
  sc.exit();
 }
}


//StreamWorkerA 

import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;

public class StreamWorkerA extends MigrantWorker
{
 public WareHouse doTask(WareHouse inhouse)
 {
  System.out.println(inhouse);
  //do something
  StreamCtorB sc = new StreamCtorB();
  WareHouse msg = new WareHouse();
  msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA");
  WareHouse wh = sc.giveTask(msg);
  sc.exit();
  
  return wh;
 }
 
 public static void main(String[] args)
 {
  StreamWorkerA wd = new StreamWorkerA();
  wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA");
 }
}


//StreamCtorB 

import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;

public class StreamCtorB extends Contractor
{
 public WareHouse giveTask(WareHouse inhouse)
 {
  WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB");
  System.out.println("wks.length:"+wks.length);
  
  WareHouse[] hmarr = doTaskBatch(wks, inhouse);
  
  WareHouse result = new WareHouse();
  result.put("B1",hmarr[0]);
  result.put("B2",hmarr[1]);
  
  return result;
 }
}


//StreamWorkerB 

import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;

public class StreamWorkerB extends MigrantWorker
{
 public WareHouse doTask(WareHouse inhouse)
 {
  System.out.println(inhouse);
  //do something
  inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB");
  return inhouse;
 }
 
 public static void main(String[] args)
 {
  StreamWorkerB wd = new StreamWorkerB();
  wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB");
 }
}