一個bolt接收spout和另一個bolt的tuple
阿新 • • 發佈:2019-02-08
package org.yanzhen.stom.topology; public class TopologyLocalMain { public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException { // Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("KafkaReaderSpout", new KafkaReaderSpout()); builder.setBolt("DataParseBolt", new DataParseBolt(), 1).shuffleGrouping("KafkaReaderSpout"); builder.setBolt("RedisCountBolt", new RedisCountBolt(), 1).shuffleGrouping("DataParseBolt").shuffleGrouping("KafkaReaderSpout"); // Configuration Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("yanzhenLog-RTProcess", conf, builder.createTopology()); Thread.sleep(100000000); cluster.shutdown(); } }
public class RedisCountBolt extends BaseBasicBolt { private static Logger logger = LogManager.getLogger(RedisCountBolt.class); private static final long serialVersionUID = 1L; RedisOper redis; @Override public void prepare(Map stormConf, TopologyContext context) { redis = new RedisOper(); } @SuppressWarnings("unchecked") @Override public void execute(Tuple input, BasicOutputCollector collector) { String messageout = "" ; Map<String, String> retMap = null; if(input.getSourceComponent().equals("KafkaReaderSpout")){ messageout = (String)input.getValue(0); System.out.println("spout:"+messageout); }else if(input.getSourceComponent().equals("DataParseBolt")){ retMap = (Map<String, String>) input.getValue(0); System.out.println("Dataparsebolt:url"+retMap.get("IF_URL")); } System.out.println("------"); } } execute()方法的if,else中輸出語句寫在<pre name="code" class="java">System.out.println("------");後面程式出錯,空指標異常,(是不是多執行緒?)