1. 程式人生 > >從Kafka topic中獲取資料並在Storm中進行分析

從Kafka topic中獲取資料並在Storm中進行分析

從Kafka topic中獲取資料


    String zks = "x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181";

    String topic = "test";
    String zkRoot = "/storm";
    String id = "word";//預設是word
    BrokerHosts brokerHosts = new ZkHosts(zks);

    /**
     *  1、Spout屬性設定
     */
    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, "KafkaSpout-reader"
); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = false;//強制不從最開始讀取資料 spoutConf.zkServers = Arrays.asList(new String[] {"x.x.x.x","x.x.x.x","x.x.x.x"}); spoutConf.zkPort = 2181; spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.startOffsetTime =kafka.api.OffsetRequest.LatestTime(); spoutConf.fetchMaxWait = 10000000
; /** * 2、建立TopologyBuilder * 並設定Spout物件(其中KafkaSpout物件來自storm-kafka-0.9.2-incubating.jar包下 * public class storm.kafka.KafkaSpout extends backtype.storm.topology.base.BaseRichSpout) */ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("KafkaSpout-reader"
, new KafkaSpout(spoutConf), 1);//spoutConf中可以進行SpoutConfig屬性設定 builder.setBolt("xxx", new KafkaBolt(), 3).shuffleGrouping("KafkaSpout-reader");

將獲取的資料提交的Storm中處理