1. 程式人生 > >Storm作為新消費者對接Kafka 0.10.x+版本

Storm作為新消費者對接Kafka 0.10.x+版本

Storm應用場景—作為新消費者對接Kafka 0.10.x+版本(一)

00 背景

隨著Kafka版本的升級,Storm作為消費者對接Kafka 0.10.x+版本的方式上,與之前存在差異,現將新的方式記錄下來,為以後遇到使用Storm實時處理新版Kafka資料的場景時提供參考。

01 架構簡介

架構如下圖所示。
架構
使用Flume元件採集資料時,採用雙層架構,第一層的作用是採集,第二層的作用是聚合,這種架構能夠達到負載均衡的效果。第二層會將資料傳送到Kafka,Storm會實時從Kafka讀取資料,根據需求進行處理,然後將處理後的資料傳送到對應的儲存層。本篇重點關注Storm從Kafka讀取並處理資料。

02 演示

第一步:向採集檔案寫入資料
寫入資料
第二步:觀察Kafka topic情況
topic變化情況
檢視手動建立消費者情況
由以上說明,資料成功寫入Kafka。
第三步:執行Storm word count程式,檢視結果。
結果1
結果2
結果3

03程式碼

Storm處理資料流程如下圖所示。
Storm處理資料流程

KafkaWordCountTopology部分

package com.ccc.storm.learning.topology.kafka;

import com.ccc.storm.learning.bolt.kafka.KafkaOutputBolt;
import com.ccc.storm.learning.bolt.kafka.KafkaSplitSentenceBolt;
import com.ccc.storm.learning.bolt.kafka.KafkaWordCountBolt; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.
TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @author ccc */ public class KafkaWordCountTopology { private static final String TOPICS = "word-count"; private static final String KEYS = "word"; private static final String BOOTSTRAP_SERVERS = "master:9092,slave:9092,slave3:9092"; private static final String KAFKA_WORD_COUT_SPOUT_ID = "KafkaWordCountSpout"; private static final String SPLIT_SENTENCE_BOLT_ID = "SplitSentenceBolt"; private static final String KAFKA_WORD_COUNT_BOLT_ID = "KafkaWordCountBolt"; private static final String KAFKA_OUTPUT_BOLT_ID = "KafkaOutputBolt"; public static void main(String[] args) throws Exception { KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPICS) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setTupleTrackingEnforced(true) .build(); KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig); TopologyBuilder tp = new TopologyBuilder(); tp.setSpout(KAFKA_WORD_COUT_SPOUT_ID, kafkaSpout, 2); tp.setBolt(SPLIT_SENTENCE_BOLT_ID, new KafkaSplitSentenceBolt(), 2) .setNumTasks(2) .shuffleGrouping(KAFKA_WORD_COUT_SPOUT_ID); tp.setBolt(KAFKA_WORD_COUNT_BOLT_ID, new KafkaWordCountBolt(), 4) .setNumTasks(2) .fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields(KEYS)); tp.setBolt(KAFKA_OUTPUT_BOLT_ID, new KafkaOutputBolt(), 2) .setNumTasks(2) .globalGrouping(KAFKA_WORD_COUNT_BOLT_ID); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { // 提交叢集 conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, tp.createTopology()); } else { // 本地測試 conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka-word-count-topology", conf, tp.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } }

KafkaSplitSentenceBolt部分

package com.ccc.storm.learning.bolt.kafka;


import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * @author ccc
 */
public class KafkaSplitSentenceBolt implements IBasicBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String poetry = tuple.getStringByField("value");
        List<String> sentences = Arrays.asList(poetry.split(","));
        sentences.forEach(sentence -> {
            List<String> words = Arrays.asList(sentence.replace("。", "").split(""));
            words.forEach(word -> collector.emit(new Values(word)));
        });
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

KafkaWordCountBolt部分

package com.ccc.storm.learning.bolt.kafka;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * @author ccc
 */
public class KafkaWordCountBolt implements IBasicBolt {

    private Map<String, Integer> wordCounts = new HashMap<>();


    @Override
    public void prepare(Map stormConf, TopologyContext context) {

    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getStringByField("word");
        Integer counts = wordCounts.get(word);
        if (counts == null) {
            counts = 0;
        }
        counts++;
        wordCounts.put(word, counts);
        collector.emit(new Values(word, counts));
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "counts"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

KafkaOutputBolt部分

package com.ccc.storm.learning.bolt.kafka;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.*;

/**
 * @author ccc
 */
public class KafkaOutputBolt implements IBasicBolt {

    private Map<String, Integer> wordCounts = new HashMap<>();

    @Override
    public void prepare(Map stormConf, TopologyContext context) {

    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getStringByField("word");
        Integer counts = tuple.getIntegerByField("counts");
        this.wordCounts.put(word, counts);
    }

    @Override
    public void cleanup() {
        Set<String> keySet = wordCounts.keySet();
        List<String> keyList = new ArrayList<>();
        keyList.addAll(keySet);
        Collections.sort(keyList);
        keyList.forEach(key -> System.out.println(key + "->" + wordCounts.get(key)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

程式碼大體框架如上面所示,在實際開發過程中可以對其進行優化。

更多內容請關注公眾號:大資料開發與學習茶館
在這裡插入圖片描述