大資料入門(21)storm和kafka結合的例項
阿新 • • 發佈:2018-12-10
1、原理: storm的lib下的jar, external\storm-kafka\storm-kafka-0.9.2-incubating.jar 此jar中的sqout已經寫好 2、/********** KafkaTopoMain :執行,在本地生成檔案****************/ public class KafkaTopoMain { public static void main(String[] args) { BrokerHosts hosts = new ZkHosts("weekend05:2181,weekend06:2181,weekend07:2181"); String topic = "order_name"; String zkRoot="/kafka-storm"; String spoutId = "KafkaSpoutId"; SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, spoutId); spoutConfig.forceFromStart = true;//設定從頭開始 spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());//反序列化,將byte專為String TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new KafkaSpout(spoutConfig)); builder.setBolt("word_split", new WordSplitBolt(), 4).shuffleGrouping(spoutId); //檔案分割 builder.setBolt("write", new WordWriteBolt(), 4).fieldsGrouping("word_split", new Fields("word")); //檔案輸出 Config conf = new Config(); conf.setNumWorkers(4); conf.setNumAckers(0); conf.setDebug(false); //LocalCluster用來將topology提交到本地模擬器執行,方便開發除錯 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word_count", conf, builder.createTopology()); //StormSubmitter.submitTopology("word_split_topo", conf, builder.createTopology()); } } 3、/************************MessageScheme*****************************/ public class MessageScheme implements Scheme{ private static final long serialVersionUID = 1L; @Override public List<Object> deserialize(byte[] bytes) { try { String msg = new String(bytes,"UTF-8"); return new Values(msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } @Override public Fields getOutputFields() { return new Fields("msg"); } } 4、/************************WordSplitBolt*****************************/ public class WordSplitBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; @Override public void execute(Tuple input, BasicOutputCollector collector) { String line = input.getString(0); String[] words = line.split(" "); for(String word :words){ word = word.trim(); collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } 5、/************************WordWriteBolt*****************************/ public class WordWriteBolt extends BaseBasicBolt{ private static final long serialVersionUID = -6146435791523614088L; private FileWriter fileWrite = null; //檔案輸出位置 @Override public void prepare(Map stormConf, TopologyContext context) { try { fileWrite = new FileWriter("e:\\storm_kafka_output\\wordcount"+UUID.randomUUID().toString()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String str = input.getString(0); try { fileWrite.write(str); fileWrite.write("\n"); fileWrite.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub }