1. 程式人生 > >storm下執行C++程式(一)

storm下執行C++程式(一)

轉載,原文地址:http://blog.csdn.net/yan_mount/article/details/11527799

學習storm有段時間了,也搭建了一個簡單的環境,很欣賞它的一些理念,考慮到很多程式是C++實現的,如果要使用該平臺的話,需要為這些程式實現一個介面,方便統一在storm中執行,折騰了幾天,初步成功的實現了一個C++的bolt,特分享如下:

1,需要先定義一個java的殼:


  1. MyShellBolt extends ShellBolt implements IRichBolt{ 
  2. public MyShellBolt()  
  3. {  
  4.     super("/bin/sh","start.sh"
    );  
  5. }  
  6. @Override
  7. publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {  
  8.     declarer.declare(new Fields("word"));  
  9. }  
  10. @Override
  11. public Map<String, Object> getComponentConfiguration() {  
  12.     returnnull;  

  13. }

2,指令碼start.sh中:

  1. chmod a+x Bolt  
  2. ./Bolt  
這個地方使用了指令碼來中轉一下,因為直接在java中呼叫super("./Bolt")時,遇到了各種問題,要麼找不到檔案,要麼沒有執行許可權,最後使用指令碼才解決這個問題。

3,仿照storm自帶的python程式的例子編寫了C++的庫:

主要是採用jsoncpp庫(jsoncpp-src-0.5.0)實現json串的解析和輸出
比如:
  1. void StormUtils::storm_emit(const string msg[],size_t size)  
  2. {  
  3.     Json::Value out;  
  4.     out["command"]="emit";  
  5.     for(size_t i=0;i<size;i++)  
  6.     {  
  7.         out["tuple"].append(msg[i]);  
  8.     }  
  9.     outs << out.toStyledString() << "end\n"
    <<endl;  
  10.     return;  
  11. }  
  1. void StormUtils::init(const string& handshake)  
  2. {  
  3.     Json::Reader reader;  
  4.     Json::Value config;  
  5.     if (reader.parse(handshake, config))  
  6.     {//parse handshake OK
  7.         constchar* piddir=config["pidDir"].asCString();  
  8.         int iPid = (int)getpid();  
  9.         char pid[64];  
  10.         sprintf(pid, "%d", iPid);  
  11.         stringstream ss;  
  12.         ss<<piddir<<"/"<<iPid;  
  13.         ofstream file(ss.str().c_str(),ofstream::out);  
  14.         file.close();  
  15.         msg_map msg;  
  16.         msg_pair my_pair("pid",pid);  
  17.         msg.insert(my_pair);  
  18.         storm_send(msg);  
  19.     }  
  20.     return;  
  21. }  

其中要注意的是:

1,init握手函式的json輸出中,pid的value的number,所以不能加“”,其它輸出value都是string。

2,每次輸出到storm平臺的json串最後都要加一行end,如:

outs<<out.toStyledString()<< "end\n"<<endl;

3,jsoncpp的reader.parse函式在遇到非json格式串時,會被阻塞住(不知原因),所以自己還簡單判斷了下storm傳給bolt的訊息串是否是json格式,否則丟棄

其中bolt的主體迴圈流程為:

  1. void process(::StormUtils& stormUtils)  
  2. {  
  3.     Json::Reader reader;  
  4.     Json::Value value;  
  5.     string msg;  
  6.     while(true)  
  7.     {  
  8.         //this function will be blocked from stdin
  9.         stormUtils.read_msg(std::cin,msg);  
  10.         stormUtils.storm_log("read_msg:"+msg);  
  11.         size_t pos=msg.find("{");  
  12.         if (pos==string::npos)  
  13.         {//no {} in string,then discard
  14.             continue;  
  15.         }  
  16.         elseif (pos>0)  
  17.         {//erase the invalid part from msg
  18.             msg=msg.erase(0,pos);  
  19.         }  
  20.         if (reader.parse(msg, value))  
  21.         {  
  22.             bool hasID=value.isMember("id");  
  23.             if (hasID)  
  24.             {//get tuple
  25.                 //tuple is array
  26.                 constchar* ID=value["id"].asCString();  
  27.                 const Json::Value tuples = value["tuple"];  
  28.                 string tuple = tuples[0u].asString();  
  29.                 vector<string> words=stormUtils.split(tuple," ");  
  30.                 vector<string>::iterator it;  
  31.                 for(it=words.begin();it!=words.end();it++)  
  32.                 {  
  33.                     stormUtils.storm_log("emit:"+*it);  
  34.                     string outMsg[]={*it};  
  35.                     stormUtils.storm_emit(outMsg,1);  
  36.                 }  
  37.                 stormUtils.storm_log(ID);  
  38.                 stormUtils.storm_ack(ID);  
  39.             }  
  40.         }  
  41.         else
  42.         {  
  43.             stormUtils.storm_log("msg parse error:"+msg);  
  44.         }  
  45.         msg.clear();//ready to read again from stdin
  46.     }  
  47.     return;  
  48. }  

4,打包

把shell指令碼和Bolt執行檔案放在jar包的/resources目錄下即可

5,驗證

無論在本地模式還是叢集模式下都執行成功,除錯時在topology中記得開啟

conf.setDebug(true);

這樣在本地模式執行時,從日誌裡就可以看到C++程式中列印的日誌,如:

4861 [Thread-21] INFO backtype.storm.task.ShellBolt - Shell msg: read_msg:{"id":"-1825914362791431189","stream":"default","comp":"MySpout","tuple":["snow white and the seven dwarfs"],"task":1}

系統啟動C程式的日誌:

4724 [Thread-20] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 2039

6,問題

1,不知為什麼,bolt程式會接收到非json格式的輸入:

Shell msg: read_msg:[3][3][3][3][3][3]

造成程式僵死,用單元測試也發現jsoncpp有這個問題,還請高手指點驗證

總之,用C++實現的Bolt基本跑通,為後續真正的業務模組(C++實現)的使用打下基礎