1. 程式人生 > >14.7 DRPC

14.7 DRPC

什麼是DRPC: --是分散式遠端呼叫

– RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議

– Distributed RPC:rpc請求流式、並進行處理

– RPC請求引數當做輸入流,結果當做輸出流

– 利用storm的分散式進行處理機制和能力

– 藉助DRPC server接收請求、返回相應

Storm只能獲取資料,不能接請求和發響應,所以這裡藉助一個DRPC Server來幫 助完成

DRPC把大量請求分散式的去做,一次請求如果序列的話可能會比較慢,我並行的來 處理,另一方面通過來降低平均一次請求的時間,解決了響應的吞吐

配置DRPC:apache-storm-0.9.5/conf/storm.yaml

修改drpc.servers配置,同步到其他節點

drpc.servers:

- "node2"

#啟動DRPC server:

1,首先啟動storm叢集

, 2,然後:./bin/storm drpc >> ./logs/drpc.out 2>&1 & 在主節點上啟動就行

在叢集建立DRPC:給這個方法打jar包,放到storm叢集中執行:

./bin/storm jar /opt/local/drpc.jar com.bjsxt.basic.drpc.ManualDRPC drpc

package com.bjsxt.basic.drpc;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.drpc.ReturnResults;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

/**

 * 在叢集建立DRPC

 * @author root

 *

 */

public class ManualDRPC {

 public static class ExclamationBolt extends BaseBasicBolt {

  @Override

  public void declareOutputFields(OutputFieldsDeclarer declarer) {

   declarer.declare(new Fields("result", "return-info"));

  }

//可以寫自己的邏輯

  @Override

  public void execute(Tuple tuple, BasicOutputCollector collector) {

   String arg = tuple.getString(0);

   Object retInfo = tuple.getValue(1);

   collector.emit(new Values(arg + "!!!", retInfo));

  }

 }

 public static void main(String[] args) {

  TopologyBuilder builder = new TopologyBuilder();

  LocalDRPC drpc = new LocalDRPC();

  if (args.length > 0) {

   DRPCSpout spout = new DRPCSpout("exclamation");

   builder.setSpout("drpc", spout);

   builder.setBolt("exclaim", new ExclamationBolt(), 3)

     .shuffleGrouping("drpc");

   builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping(

     "exclaim");

   Config conf = new Config();

   try {

    StormSubmitter.submitTopology(args[0], conf,

      builder.createTopology());

   } catch (AlreadyAliveException e) {

    e.printStackTrace();

   } catch (InvalidTopologyException e) {

    e.printStackTrace();

   }

  } else {

   DRPCSpout spout = new DRPCSpout("exclamation", drpc);

   builder.setSpout("drpc", spout);

   builder.setBolt("exclaim", new ExclamationBolt(), 3)

     .shuffleGrouping("drpc");

   builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping(

     "exclaim");

   LocalCluster cluster = new LocalCluster();

   Config conf = new Config();

   cluster.submitTopology("exclaim", conf, builder.createTopology());

  }

 }

}

Java建立客戶端連線DRPC:DRPC在叢集上建立之後,eclipse可以執行

package com.bjsxt.basic.drpc;

import org.apache.thrift7.TException;

import backtype.storm.generated.DRPCExecutionException;

import backtype.storm.utils.DRPCClient;

/**

 * 建立客戶端訪問DRPC

 * @author root

 *

 */

public class MyDRPCclient {

 /**

  * @param args

  */

 public static void main(String[] args) {

  DRPCClient client = new DRPCClient("node2", 3772);

  try {

   String result = client.execute("exclamation", "hello ");

   System.out.println(result);

  } catch (TException e) {

   e.printStackTrace();

  } catch (DRPCExecutionException e) {

   e.printStackTrace();

  } 

 }

}