1. 程式人生 > >Storm整合Kafka應用的開發

Storm整合Kafka應用的開發

https://www.cnblogs.com/freeweb/p/5292961.html

Storm整合Kafka應用的開發

  我們知道storm的作用主要是進行流式計算,對於源源不斷的均勻資料流流入處理是非常有效的,而現實生活中大部分場景並不是均勻的資料流,而是時而多時而少的資料流入,這種情況下顯然用批量處理是不合適的,如果使用storm做實時計算的話可能因為資料擁堵而導致伺服器掛掉,應對這種情況,使用kafka作為訊息佇列是非常合適的選擇,kafka可以將不均勻的資料轉換成均勻的訊息流,從而和storm比較完善的結合,這樣才可以實現穩定的流式計算,那麼我們接下來開發一個簡單的案例來實現storm和kafka的結合

  storm和kafka結合,實質上無非是之前我們說過的計算模式結合起來,就是資料先進入kafka生產者,然後storm作為消費者進行消費,最後將消費後的資料輸出或者儲存到檔案、資料庫、分散式儲存等等,具體框圖如下:

  

  這張圖片摘自部落格地址:http://www.cnblogs.com/tovin/p/3974417.html 在此感謝作者的奉獻

  首先我們保證在伺服器上zookeeper、kafka、storm正常執行,然後我們開始寫程式,這裡使用eclipse for javaee IDE

  和之前一樣,建立一個maven專案,在pom.xml寫入如下程式碼:

複製程式碼

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4 
 5   <groupId>kafkastorm</groupId>
 6   <artifactId>kafkastorm</artifactId>
 7   <version>0.0.1-SNAPSHOT</version>
 8   <packaging>jar</packaging>
 9 
10   <name>kafkastorm</name>
11   <url>http://maven.apache.org</url>
12 
13   <properties>
14     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15   </properties>
16 
17   <dependencies>
18     <dependency>
19       <groupId>junit</groupId>
20       <artifactId>junit</artifactId>
21       <version>3.8.1</version>
22       <scope>test</scope>
23     </dependency>
24     <dependency>
25         <groupId>org.apache.storm</groupId>
26         <artifactId>storm-core</artifactId>
27         <version>0.9.6</version>
28         <scope>provided</scope>
29     </dependency>
30     <dependency>
31         <groupId>org.apache.kafka</groupId>
32         <artifactId>kafka_2.9.2</artifactId>
33         <version>0.8.2.2</version>
34         <exclusions>
35             <exclusion>
36                 <groupId>org.apache.zookeeper</groupId>
37                 <artifactId>zookeeper</artifactId>
38             </exclusion>
39             <exclusion>
40                 <groupId>log4j</groupId>
41                 <artifactId>log4j</artifactId>
42             </exclusion>
43         </exclusions>
44     </dependency>
45     <dependency>
46         <groupId>org.apache.storm</groupId>
47         <artifactId>storm-kafka</artifactId>
48         <version>0.9.6</version>  
49     </dependency>
50   </dependencies>
51   
52   <build>
53     <plugins>
54       <plugin>
55         <artifactId>maven-assembly-plugin</artifactId>
56         <configuration>
57           <descriptorRefs>
58             <descriptorRef>jar-with-dependencies</descriptorRef>
59           </descriptorRefs>
60         </configuration>
61         <executions>
62           <execution>
63             <id>make-assembly</id> 
64             <phase>package</phase>
65             <goals>
66               <goal>single</goal>
67             </goals>
68           </execution>
69         </executions>
70       </plugin>
71     </plugins>
72   </build>
73 </project>

複製程式碼

  主要是匯入的zookeeper、storm、kafka外部依賴這些疊加起來,還有<plugin>外掛便於我們後續對程式程序maven的打包

  和之前一樣首先編寫storm消費kafka的邏輯,MessageScheme類,程式碼如下:

複製程式碼

 1 package net.zengzhiying;
 2 
 3 import java.io.UnsupportedEncodingException;
 4 import java.util.List;
 5 
 6 import backtype.storm.spout.Scheme;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Values;
 9 
10 public class MessageScheme implements Scheme {
11 
12     public List<Object> deserialize(byte[] arg0) {
13         try {
14             String msg = new String(arg0, "UTF-8");
15             return new Values(msg);
16         } catch (UnsupportedEncodingException e) {
17             e.printStackTrace();
18         }
19         return null;
20     }
21 
22     public Fields getOutputFields() {
23         return new Fields("msg");
24     }
25 
26 }

複製程式碼

  邏輯很簡單,就是對kafka出來的資料轉換成字串,接下來我們想辦法來處理strom清洗之後的資料,我們為了簡單就把輸出儲存到一個檔案中,Bolt邏輯SenqueceBolt類的程式碼如下:

複製程式碼

 1 package net.zengzhiying;
 2 
 3 import java.io.DataOutputStream;
 4 import java.io.FileNotFoundException;
 5 import java.io.FileOutputStream;
 6 import java.io.IOException;
 7 
 8 import backtype.storm.topology.BasicOutputCollector;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.base.BaseBasicBolt;
11 import backtype.storm.tuple.Fields;
12 import backtype.storm.tuple.Tuple;
13 import backtype.storm.tuple.Values;
14 
15 public class SenqueceBolt extends BaseBasicBolt {
16 
17     public void execute(Tuple arg0, BasicOutputCollector arg1) {
18         String word = (String) arg0.getValue(0);
19         String out = "output:" + word;
20         System.out.println(out);
21         
22         //寫檔案
23         try {
24             DataOutputStream out_file = new DataOutputStream(new FileOutputStream("kafkastorm.out"));
25             out_file.writeUTF(out);
26             out_file.close();
27         } catch (FileNotFoundException e) {
28             // TODO Auto-generated catch block
29             e.printStackTrace();
30         } catch (IOException e) {
31             // TODO Auto-generated catch block
32             e.printStackTrace();
33         }
34         
35         arg1.emit(new Values(out));
36     }
37 
38     public void declareOutputFields(OutputFieldsDeclarer arg0) {
39         arg0.declare(new Fields("message"));
40     }
41 
42 }

複製程式碼

  就是把輸出的訊息放到檔案kafkastorm.out中

  然後我們編寫主類,也就是配置kafka提交topology到storm的程式碼,類名為StormKafkaTopo,程式碼如下:

複製程式碼

 1 package net.zengzhiying;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import backtype.storm.Config;
 7 import backtype.storm.LocalCluster;
 8 import backtype.storm.StormSubmitter;
 9 import backtype.storm.generated.AlreadyAliveException;
10 import backtype.storm.generated.InvalidTopologyException;
11 import backtype.storm.spout.SchemeAsMultiScheme;
12 import backtype.storm.topology.TopologyBuilder;
13 import backtype.storm.utils.Utils;
14 import storm.kafka.BrokerHosts;
15 import storm.kafka.KafkaSpout;
16 import storm.kafka.SpoutConfig;
17 import storm.kafka.ZkHosts;
18 import storm.kafka.bolt.KafkaBolt;
19 
20 public class StormKafkaTopo {
21     public static void main(String[] args) {
22         BrokerHosts brokerHosts = new ZkHosts("192.168.1.216:2181/kafka");
23         
24         SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/kafka", "kafkaspout");
25         
26         Config conf = new Config();
27         Map<String, String> map = new HashMap<String, String>();
28         
29         map.put("metadata.broker.list", "192.168.1.216:9092");
30         map.put("serializer.class", "kafka.serializer.StringEncoder");
31         conf.put("kafka.broker.properties", map);
32         conf.put("topic", "topic2");
33         
34         spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
35         
36         TopologyBuilder builder = new TopologyBuilder();
37         builder.setSpout("spout", new KafkaSpout(spoutConfig));
38         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
39         builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
40         
41         if(args != null && args.length > 0) {
42             //提交到叢集執行
43             try {
44                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
45             } catch (AlreadyAliveException e) {
46                 e.printStackTrace();
47             } catch (InvalidTopologyException e) {
48                 e.printStackTrace();
49             }
50         } else {
51             //本地模式執行
52             LocalCluster cluster = new LocalCluster();
53             cluster.submitTopology("Topotest1121", conf, builder.createTopology());
54             Utils.sleep(1000000);
55             cluster.killTopology("Topotest1121");
56             cluster.shutdown();
57         }
58         
59         
60         
61     }
62 }

複製程式碼

  注意上面程式碼的配置,和之前單獨執行storm和kafka程式碼不太一樣,配置也很簡單,注意區別即可,如果細心的話會注意到這裡建了兩個topic一個是topic1,一個是topic2,topic1的含義kafka接收生產者過來的資料所需要的topic,topic2是KafkaBolt也就是storm中的bolt生成的topic,當然這裡topic2這行配置可以省略,是沒有任何問題的,類似於一箇中轉的東西,另外我們這次測試是上傳到伺服器執行,本地模式的程式碼沒有執行到,當然原理是一樣的

  之前一般網上的教程到這裡就完畢了,這樣我們會引起一種沒有生產者的誤區,注意:上面3個類實現的功能是kafka消費者輸出的資料被storm消費!生產者的程式碼可以看成獨立的其他來源,可以寫在其他專案中,根據資料來源的情況來,下面我們為了示例,編寫一個類來進行生產,程式碼和之前kafka單獨的一樣:

複製程式碼

 1 package net.zengzhiying;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.Properties;
 6 
 7 import kafka.javaapi.producer.Producer;
 8 import kafka.producer.KeyedMessage;
 9 import kafka.producer.ProducerConfig;
10 
11 public class DataProducerInsert {
12     private static Producer<Integer,String> producer;
13     private final Properties props=new Properties();
14     public DataProducerInsert(){
15             //定義連線的broker list
16             props.put("metadata.broker.list", "192.168.1.216:9092");
17             //定義序列化類 Java中物件傳輸之前要序列化
18             props.put("serializer.class", "kafka.serializer.StringEncoder");
19             //props.put("advertised.host.name", "192.168.1.216");
20             producer = new Producer<Integer, String>(new ProducerConfig(props));
21     }
22     public static void main(String[] args) {
23             DataProducerInsert sp=new DataProducerInsert();
24             //定義topic
25             String topic="topic1";
26             //開始時間統計
27             long startTime = System.currentTimeMillis();
28             //定義要傳送給topic的訊息
29             String messageStr = "This is a message";
30             List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>();
31            
32             //構建訊息物件
33             KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
34             datalist.add(data);
35             
36             //結束時間統計
37             long endTime = System.currentTimeMillis();
38             KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, "用時" + (endTime-startTime)/1000.0);
39             datalist.add(data1);
40             
41             //推送訊息到broker
42             producer.send(data);
43             producer.close();
44     }
45 }

複製程式碼

  注意,這裡我們定義的topic是topic1,正好和前面的topic1資料來源對應,是整個kafka保持一致的topic,也就是說kafka生產者topic和消費者topic是必須名稱相同才可以響應,下面簡單添加了一點時間統計的程式碼,也很簡單

  另外還要注意kafka配置檔案host.name儘量改成ip,和之前說過的一樣

  到現在專案就編寫完成了,然後我們使用maven命令對專案打包,首先得保證我們windows上安裝好了maven,我們執行cmd,進入到當前專案目錄下,執行命令: mvn assembly:assembly 進行打包,打包的前提就是之前pom.xml的所有配置,執行後maven會自動下載相應的依賴並完成打包,需要耐心等待一會:

  

  看到如圖所示的BUILD SUCCESS返回之後,那麼打包就成功了,現在進入專案目錄下的target目錄中,會看到2個jar包

  

  其中後面那個檔名較長的大小也比較大,是包含相關依賴的包,接下來我們將這個包上傳到伺服器,然後使用storm執行jar包將我們的topology上傳到叢集中執行,注意是使用storm執行jar包,而不是java

/usr/storm/apache-storm-0.9.6/bin/storm jar kafkastorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar net.zengzhiying.StormKafkaTopo kafkagostorm

  前面是storm的絕對路徑,引數jar執行jar包,後面跟的是上傳topology的主類,最後kafkagostorm是我們上傳拓撲的名稱

  

  這裡執行完之後會回到命令列,現在就在後臺叢集中開始分發運行了,這就是叢集模式,之前我們講的storm案例會不斷滾動大量資料,那個屬於本地模式,如果我們現在開啟ui介面的話,那麼訪問我們的地址http://192.168.1.216:8080/可以看到正在執行的Topology

  

  可以看到狀態是active正在運行了,我們上面程式碼中kafkabolt建立了一個topic2的訊息,我們現在可以測試一下,消費者這裡只是簡單地原樣輸出,我們進入kafka目錄,執行下面命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic topic2 --from-beginning

  後面引數--from-beginning不新增也是可以的,新增包括舊資訊,不新增就是新的輸出

  現在介面卡住,待會我們來觀察輸出,現在我們新開一個視窗,還是使用storm執行剛才的生產者類DataProducerInsert來生產一條訊息,命令如下:

/usr/storm/apache-storm-0.9.6/bin/storm jar kafkastorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar net.zengzhiying.DataProducerInsert

  回車之後,等待介面滾動2s程式跑完之後,我們看到另一個視窗輸出了訊息:

  

  然後,我們的輸出檔案在哪呢,剛才我們使用storm執行的生產者程式碼,所以輸出的kafkastorm.out就在storm的安裝目錄下,我們使用cat或者vim都可以看到檔案內容,如果有時間統計的話兩行內容顯示可能會有點問題,因為後續要進行簡單的轉換,去掉時間統計程式碼只輸出訊息的內容如下,這裡使用vim開啟的:

  

  另外注意,上傳拓撲時所有的程式碼都載入到叢集了,所以修改程式碼版本時,一定要先在storm目錄下執行 bin/storm kill topo_name 結束拓撲,修改程式碼後重新上傳即可再次執行,否則可能會出現錯誤,在叢集上的時候kafka配置檔案的host.name註釋即可,預設為localhost,最後程式碼中用到的引數比較多,很容易出錯,所以寫程式碼時還是要仔細點

  這樣storm整合kafka的測試案例就完成了,並且實現了一定的功能,只要我們靈活掌握了怎麼寫kafka和storm結合的整體拓撲結構,那麼主要的程式碼就集中在資料來源也就是kafka生產者的傳送和storm消費後的儲存問題,這所有的程式碼都是在storm和kafka給好的方法內寫邏輯,而不用關心底層,這樣使開發更加簡單快捷,比如我們消費後的資料既可以寫到檔案、資料庫還可以索引到solr,存到Hbase等,這樣就可以靈活運用了;其實最關鍵的還是要了解這些框架底層的實現原理,這樣遇到問題才可以知其然知其所以然