從Kafka topic中獲取資料並在Storm中進行分析
阿新 • • 發佈:2019-02-16
從Kafka topic中獲取資料
String zks = "x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181";
String topic = "test";
String zkRoot = "/storm";
String id = "word";//預設是word
BrokerHosts brokerHosts = new ZkHosts(zks);
/**
* 1、Spout屬性設定
*/
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, "KafkaSpout-reader" );
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;//強制不從最開始讀取資料
spoutConf.zkServers = Arrays.asList(new String[] {"x.x.x.x","x.x.x.x","x.x.x.x"});
spoutConf.zkPort = 2181;
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.startOffsetTime =kafka.api.OffsetRequest.LatestTime();
spoutConf.fetchMaxWait = 10000000 ;
/**
* 2、建立TopologyBuilder
* 並設定Spout物件(其中KafkaSpout物件來自storm-kafka-0.9.2-incubating.jar包下
* public class storm.kafka.KafkaSpout extends backtype.storm.topology.base.BaseRichSpout)
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("KafkaSpout-reader" , new KafkaSpout(spoutConf), 1);//spoutConf中可以進行SpoutConfig屬性設定
builder.setBolt("xxx", new KafkaBolt(), 3).shuffleGrouping("KafkaSpout-reader");