Storm叢集使用DRPC功能Version1.0.1
在Storm叢集上開啟DRPC功能,
基於Storm的1.0.1版本,
並且執行簡單的例子測試。
1.DRPC概念
DRPC就是分散式遠端過程呼叫。
Storm裡面引入DRPC主要是利用storm的實時計算能力,
來並行化CPU intensive的計算。
DRPC的Storm topology以函式的引數流作為輸入,
而把這些函式呼叫的返回值作為topology的輸出流。
DRPC其實不能算是Storm本身的一個特性,
它是通過組合Storm的原語spout,bolt,topology而成的一種模式(pattern)。
2.DRPC工作機制
Distributed RPC是由一個 DPRC Server 協調的,
Storm自帶了一個稱作LinearDRPCTopologyBuilder的topology builder,
它把實現DRPC的幾乎所有步驟都自動化了。
DRPC伺服器協調機制:
- 接收一個RPC請求;
- 傳送請求到Storm topology;
- 從Storm topology接收結果;
- 把結果發回給等待的客戶端。
從客戶端的角度來看一個DRPC呼叫,
跟一個普通的RPC呼叫沒有任何區別。
下面是客戶端程式碼展示瞭如何呼叫RPC的
exclaimation方法,方法的引數是hello:
DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("exclaimation", "hello");
DRPC的工作流大致是這樣的:

客戶端給DRPC伺服器傳送要執行的方法的名字,
以及這個方法的引數。
實現了這個函式的topology使用DRPCSpout從DRPC伺服器接收函式呼叫流。
每個函式呼叫被DRPC伺服器標記了一個唯一的id。
這個topology然後計算結果,
在topology的最後一個叫做ReturnResults的bolt會連線到DRPC伺服器,
並且把這個呼叫的結果傳送給DRPC伺服器(通過那個唯一的id標識)。
DRPC伺服器用那個唯一id來跟等待的客戶端匹配上,
喚醒這個客戶端並且把結果傳送給它。
3.配置DPRC Server
修改storm.yaml檔案,增加drpc的配置:
drpc.servers: - "zdh-237" drpc.childopts: "-Xmx1024m"
其他引數drpc.port, drpc.http.port等使用預設值即可,
參考預設值如下:
drpc.port: 3772 drpc.worker.threads: 64 drpc.max_buffer_size: 1048576 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.invocations.threads: 64 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" drpc.http.port: 3774 drpc.https.port: -1 drpc.https.keystore.password: "" drpc.https.keystore.type: "JKS" drpc.http.creds.plugin: backtype.storm.security.auth.DefaultHttpCredentialsPlugin drpc.authorizer.acl.filename: "drpc-auth-acl.yaml" drpc.authorizer.acl.strict: false
4.啟動DPRC Server
使用如下命令啟動DRPC程序:
storm drpc > drpc.log 2>&1 &
5.使用本地叢集測試
由於命令無入參即沒有topo名字,
無對外埠無法提供客戶端呼叫,
在BasicDRPCTopology中啟動後呼叫本地叢集,
僅作為測試場景使用。
進入Storm目錄,提交處理drpc的topo:
cd /home/stormna/apache-storm-1.0.1/examples/storm-starter/ storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology
在輸出的日誌中可以看到如下結果,
結果是單詞後面被添加了感嘆號 !
8043 [Thread-26-bolt2-executor[6 6]] INFOo.a.s.l.ThriftAccessLogger - Request ID: 3 access from:principal:operation: result Result for "hello": hello! 8054 [Thread-26-bolt2-executor[6 6]] INFOo.a.s.l.ThriftAccessLogger - Request ID: 3 access from:principal:operation: result Result for "goodbye": goodbye!
6.使用真實叢集測試
基於真實叢集的DRPC可以提供給外部客戶端呼叫,
先提交處理drpc的topo,指定topo名稱為exclamationDrpc:
cd /home/stormna/apache-storm-1.0.1/examples/storm-starter storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology exclamationDrpc
7.客戶端呼叫
在BasicDRPCTopology中提供的drpc方法名為exclamation,
效果返回結果是在單詞後面被新增的感嘆號。
使用下面寫客戶端程式碼進行呼叫測試。
7.1.適配storm-core-0.9.6.jar的客戶端程式碼
注意exclamation名稱不要拼錯,
否則客戶端會一直沒有響應:
public class DRPCClientTest096 { public static void main(String[] args) throws Exception { String drpcHost = "10.43.159.237"; int drpcPort = 3772; DRPCClient client = new DRPCClient(drpcHost, drpcPort); String input="hello1"; String result = client.execute("exclamation",input ); System.out.println("input:"+input+", result:"+result); } }
執行DRPCClientTest096類中的main方法,
呼叫drpc的exclamation函式,傳入引數hello1:
控制檯輸出結果:
input:hello1, result:hello1!
7.2.適配storm-core-1.0.1.jar的客戶端程式碼
注意呼叫需要配置Storm引數,
和上面的有點區別的。
public class DRPCClientTest101 { public static void main(String[] args) throws Exception { String drpcHost = "10.43.159.237"; int drpcPort = 3772; Properties pro = new Properties(); // InputStream inStream = new FileInputStream("stormclient.conf"); // 讀取storm-core-1.0.1.jar裡面 的defaults.yaml配置檔案 InputStream inStream = ClassLoader.getSystemResourceAsStream("defaults.yaml"); pro.load(inStream); inStream.close(); //由於Properties載入的值帶了引號,需要重新設定一下,或者手動去掉前後的引號 pro.setProperty("storm.thrift.transport", "org.apache.storm.security.auth.SimpleTransportPlugin"); DRPCClient client = new DRPCClient(pro, drpcHost, drpcPort); String input = "hello2"; String result = client.execute("exclamation", input); System.out.println("input:" + input + ", result:" + result); } }
執行DRPCClientTest101類中的main方法,
呼叫drpc的exclamation函式,傳入引數hello2:
控制檯輸出結果:
input:hello2, result:hello2!