1. 程式人生 > >storm整合kafka簡單使用示例2

storm整合kafka簡單使用示例2

StormKafkaTopo.java

package stormUse.stormUse;

import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka
.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.kafka.bolt.KafkaBolt; import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache
.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class StormKafkaTopo { public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("192.168.153.233:2181"); // 配置Kafka訂閱的Topic,以及zookeeper中資料節點目錄和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test"
, "/test" , "kafkaspout"); // 配置KafkaBolt中的kafka.broker.properties Config conf = new Config(); //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "192.168.153.233:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("topic2")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>()); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); //builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt"); builder.setBolt("kafkabolt", bolt).shuffleGrouping("bolt"); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Topo", conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("Topo"); cluster.shutdown(); } } }

SenqueceBolt.java

package stormUse.stormUse;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class SenqueceBolt extends BaseBasicBolt
{

    public void execute(Tuple input, BasicOutputCollector collector) 
    {
        // TODO Auto-generated method stub
         String word = (String) input.getValue(0);  
         String out = "I'm " + word +  "!";  
         System.out.println("out=" + out);
         collector.emit(new Values(out));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare(new Fields("message"));
    }
}

MessageScheme.java

package stormUse.stormUse;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class MessageScheme implements Scheme {

     public List<Object> deserialize(ByteBuffer ser) 
     {

         Charset charset = null;  
         CharsetDecoder decoder = null;  
         CharBuffer charBuffer = null;  

         try 
         {
             charset = Charset.forName("UTF-8");  
             decoder = charset.newDecoder();  
             charBuffer = decoder.decode(ser.asReadOnlyBuffer());  
             String msg = charBuffer.toString(); 
             return new Values(msg);

         } catch (CharacterCodingException e) 
         {  

          }
            return null;
     }

     public Fields getOutputFields() {
            return new Fields("msg");  
        }

}