1. 程式人生 > >storm和kafka整合

storm和kafka整合

storm和kafka整合

依賴

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

App

package test;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class App 
{
    public static void main( String[] args ) throws Exception{

        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig    
            .builder("worker1:9092,worker2:9092,worker3:9092", "test") // 你的kafka叢集地址和topic
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "consumer") // 設定消費者組,隨便寫
            .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024 * 4)
            // .setRecordTranslator(new MyRecordTranslator())
            .setRecordTranslator( // 翻譯函式,就是將訊息過濾下,具體操作自己玩
                    new MyRecordTranslator(), 
                    new Fields("word")
                )
            .setRetry( // 某條訊息處理失敗的策略
                    new KafkaSpoutRetryExponentialBackoff(
                        new TimeInterval(500L, TimeUnit.MICROSECONDS), 
                        TimeInterval.milliSeconds(2),
                        Integer.MAX_VALUE, 
                        TimeInterval.seconds(10)
                    )
                )
            .setOffsetCommitPeriodMs(10000)
            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
            .setMaxUncommittedOffsets(250)
            .build();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout<String, String>(conf), 1);
        builder.setBolt("Recieve", new RecieveBolt(), 1).globalGrouping("KafkaSpout");
        builder.setBolt("Consume", new ConsumeBolt(), 1).globalGrouping("Recieve");
        builder.createTopology();
        
        // 叢集執行
        // Config config = new Config();
        // config.setNumWorkers(3);
        // config.setDebug(true);
        // StormSubmitter.submitTopology("teststorm", config, builder.createTopology());

        // 本地測試
        // Config config = new Config();
        // config.setNumWorkers(3);
        // config.setDebug(true);
        // config.setMaxTaskParallelism(20);
        // LocalCluster cluster = new LocalCluster();
        // cluster.submitTopology("teststorm", config, builder.createTopology());
        // Utils.sleep(60000);
        // // 執行完畢,關閉cluster
        // cluster.shutdown();
    }
}

class MyRecordTranslator implements Func<ConsumerRecord<String, String>, List<Object>> {

    private static final long serialVersionUID = 1L;

    @Override
    public List<Object> apply(ConsumerRecord<String, String> record) {
        return new Values(record.value());
    }

}

ConsumeBolt

package test;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ConsumeBolt extends BaseRichBolt {

    private static final long serialVersionUID = -7114915627898482737L;

    private FileWriter fileWriter = null;

    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

        try {
            fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID());
            // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void execute(Tuple tuple) {

        try {
            String word = tuple.getStringByField("word") + "......." + "\n";
            fileWriter.write(word);
            fileWriter.flush();
            System.out.println(word);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

RecieveBolt

package test;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class RecieveBolt extends BaseRichBolt {

    private static final long serialVersionUID = -4758047349803579486L;

    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        // 將spout傳遞過來的tuple值進行轉換
        this.collector.emit(new Values(tuple.getStringByField("word") + "!!!")); 
    }

    // 聲明發送訊息的欄位名
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}