1. 程式人生 > >kafka+storm整合並執行demo-單機

kafka+storm整合並執行demo-單機

1,安裝環境jdk1.7,kafka_2.9.2-0.8.1.1.tgzzookeeper-3.3.6.tar.gz,apache-storm-0.9.2-incubating.tar.gz

2,安裝kafka:

I,配置kafka/conf/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=node

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=node

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/Users/eleme/ruson/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=node:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

II,修改kafka/conf/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/Users/eleme/ruson/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
III,啟動kafka是否成功:

a,啟動zookepper:bin/zookeeper-server-start.sh config/zookeeper.properties &

b,啟動kafka:bin/kafka-server-start.sh config/server.properties & 
c,檢視是否啟動成功:jps

有如下兩個程序說明啟動成功:

1942 Kafka

1845 QuorumPeerMain


d,在一個視窗執行produce:

bin/kafka-console-producer.sh --broker-list localhost:9092

 --topic test

e,開啟另一個視窗執行consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

3,安裝storm:

I,修改conf/storm.yaml檔案

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
     - "node"
#     - "server2"
#
 nimbus.host: "node"

 storm.local.dir: "/Users/eleme/ruson/tmp/storm"
 supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703
#
#
# ##### These may optionally be filled in:
#
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer

II,執行storm之前,要單獨安裝zookeeper

  修改

/Users/eleme/ruson/zookeeper-3.3.6/conf/zoo.cfg檔案

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/Users/eleme/ruson/tmp/zookeeper
logDir=Users/eleme/ruson/tmp/zookeeper/log.log
# the port at which the clients will connect
clientPort=2181
server.1=127.0.0.1:2888:3888


III,驗證storm是否成功:

1,啟動zookepper:

./zookeeper-3.3.6/bin/zkServer.sh start

2,分別啟動:

./storm nimbus 

./storm ui

./storm supervisor 

4,在執行kafka和storm整合的例子之前,先分別啟動

./zookeeper-3.3.6/bin/zkServer.sh start

./kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh kafka_2.9.2-0.8.1.1/config/server.properties 

./apache-storm-0.9.2-incubating/bin/storm nimbus

./apache-storm-0.9.2-incubating/bin/storm ui

./apache-storm-0.9.2-incubating/bin/storm supervisor


java code:

KafkaSpouttest.java檔案

package com.test.stormkafka;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
/**
 * demo1
 * @author eleme
 *
 */
public class KafkaSpouttest implements IRichSpout {
     
    private SpoutOutputCollector collector;
    private ConsumerConnector consumer;
    private String topic;
 
    public KafkaSpouttest() {
    }
     
    public KafkaSpouttest(String topic) {
        this.topic = topic;
    }
 
    public void nextTuple() {
    }
 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
 
    public void ack(Object msgId) {
    }
 
    public void activate() {
         
    	consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    	Map<String,Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, 1);  
 
        System.out.println("*********Results********topic:"+topic);  
 
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
        ConsumerIterator<byte[],byte[]> it =stream.iterator();   
        while(it.hasNext()){  
             String value =new String(it.next().message());
             SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");  
             Date curDate = new Date(System.currentTimeMillis());//獲取當前時間       
             String str = formatter.format(curDate);   
                
             System.out.println("storm接收到來自kafka的訊息------->" + value);
 
             collector.emit(new Values(value,1,str), value);
        }  
    }
     
    private static ConsumerConfig createConsumerConfig() {  
        Properties props = new Properties();  
        // 設定zookeeper的連結地址
//        props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181");  
        
//        props.put("zookeeper.connect","192.168.101.23:2181");  
        props.put("zookeeper.connect","node:2181");  

        // 設定group id
        props.put("group.id", "1");  
        // kafka的group 消費記錄是儲存在zookeeper上的, 但這個資訊在zookeeper上不是實時更新的, 需要有個間隔時間更新
        props.put("auto.commit.interval.ms", "1000");
        props.put("zookeeper.session.timeout.ms","10000");  
        return new ConsumerConfig(props);  
    }  
 
    public void close() {
    }
 
    public void deactivate() {
    }
 
    public void fail(Object msgId) {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","id","time"));
    }
 
    public Map<String, Object> getComponentConfiguration() {
        System.out.println("getComponentConfiguration被呼叫");
        topic="idoall_testTopic";
        return null;
    }
}

KafkaTopologytest.java檔案:

package com.test.stormkafka;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/**
 * demo1
 * @author eleme
 *
 */
public class KafkaTopologytest {
	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout", new KafkaSpouttest(""), 1);
		builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
		builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",
				new Fields("word"));
		Map conf = new HashMap();
		conf.put(Config.TOPOLOGY_WORKERS, 1);
		conf.put(Config.TOPOLOGY_DEBUG, true);

		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("my-flume-kafka-storm-topology-integration",conf, builder.createTopology());

		Utils.sleep(1000 * 60 * 5); // local cluster test ...
		cluster.shutdown();

	}

	public static class Bolt1 extends BaseBasicBolt {

		public void execute(Tuple input, BasicOutputCollector collector) {
			try {

				String msg = input.getString(0);
				int id = input.getInteger(1);
				String time = input.getString(2);
				msg = msg + "bolt1";
				System.out.println("對訊息加工第1次-------[arg0]:" + msg
						+ "---[arg1]:" + id + "---[arg2]:" + time + "------->"
						+ msg);
				if (msg != null) {
					collector.emit(new Values(msg));
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("word"));
		}
	}

	public static class Bolt2 extends BaseBasicBolt {
		Map<String, Integer> counts = new HashMap<String, Integer>();

		public void execute(Tuple tuple, BasicOutputCollector collector) {
			String msg = tuple.getString(0);
			msg = msg + "bolt2";
			System.out.println("對訊息加工第2次---------->" + msg);
			collector.emit(new Values(msg, 1));
		}

		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("word", "count"));
		}
	}
}

pom.xml檔案:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.test</groupId>
  <artifactId>stormkafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>stormkafka</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  
      <dependency>
           <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>0.9.2-incubating</version>
             <scope>provided</scope> 
       </dependency>
   
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.1.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
       
        <dependency>  
          <groupId>org.apache.storm</groupId>  
         <artifactId>storm-kafka</artifactId>  
          <version>0.9.2-incubating</version>  
    </dependency>  
  </dependencies>
  
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id> 
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  
</project>


1,執行main方法:

2,執行kafka的producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic idoall_testTopic

在shell命令列輸入:

=======

elemedeMacBook-Pro:ruson eleme$ ./kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-list node:9092 --topic testTopic

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2014-12-14 09:58:13,005] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

========

[2014-12-14 09:58:23,665] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

[2014-12-14 09:59:25,795] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

=======

[2014-12-14 10:00:49,132] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

[2014-12-14 10:01:04,765] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

[2014-12-14 10:01:22,591] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)



在eclipse控制檯顯示如下:

對訊息加工第1次-------[arg0]:bolt1---[arg1]:1---[arg2]:2014年12月14日 09:58:13 144------->bolt1
33664 [Thread-8-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 default [bolt1]
33664 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: spout:6, stream: __ack_init, id: {}, [-5044490574571788562 -6834576169241042166 6]
33665 [Thread-8-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-5044490574571788562 -8030044259304607564]
33665 [Thread-14-bolt2] INFO  backtype.storm.daemon.executor - Processing received message source: bolt1:2, stream: default, id: {-5044490574571788562=3578453389181342654}, [bolt1]
對訊息加工第2次---------->bolt1bolt2
33665 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: bolt1:2, stream: __ack_ack, id: {}, [-5044490574571788562 -8030044259304607564]
33665 [Thread-14-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 default [bolt1bolt2, 1]
33666 [Thread-14-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-5044490574571788562 3578453389181342654]
33666 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: bolt2:5, stream: __ack_ack, id: {}, [-5044490574571788562 3578453389181342654]
33666 [Thread-20-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-5044490574571788562]
storm接收到來自kafka的訊息------->========
44249 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout default [========, 1, 2014年12月14日 09:58:23 729]
44249 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout __ack_init [7355713949317680184 8328867585479832335 6]
44249 [Thread-8-bolt1] INFO  backtype.storm.daemon.executor - Processing received message source: spout:6, stream: default, id: {7355713949317680184=8328867585479832335}, [========, 1, 2014年12月14日 09:58:23 729]
對訊息加工第1次-------[arg0]:========bolt1---[arg1]:1---[arg2]:2014年12月14日 09:58:23 729------->========bolt1
44249 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: spout:6, stream: __ack_init, id: {}, [7355713949317680184 8328867585479832335 6]
44250 [Thread-8-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 default [========bolt1]
44250 [Thread-8-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [7355713949317680184 -6542584687991049767]
44250 [Thread-14-bolt2] INFO  backtype.storm.daemon.executor - Processing received message source: bolt1:2, stream: default, id: {7355713949317680184=-2980814423219223850}, [========bolt1]
對訊息加工第2次---------->========bolt1bolt2
44250 [Thread-14-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 default [========bolt1bolt2, 1]




總結:java例子參考其他人,此時整合成功,只是在單機上的




相關推薦

kafka+storm整合執行demo-單機

1,安裝環境jdk1.7,kafka_2.9.2-0.8.1.1.tgz,zookeeper-3.3.6.tar.gz,apache-storm-0.9.2-incubating.tar.gz 2,安裝kafka: I,配置kafka/conf/server.propert

flume讀取日誌資料寫入kafka 然後kafka+storm整合

一、flume配置 flume要求1.6以上版本 flume-conf.properties檔案配置內容,sinks的輸出作為kafka的product a1.sources = r1 a1.sinks = k1 a1.cha

kafka+storm整合程式碼

package com.ljt.stormandkafka.kafkaAndStorm; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig;

window上輕鬆安裝CNTK執行demo

window上安裝CNTK並執行demo CNTK安裝有其中方式: windows上:Python-only installation;Script-driven installation;Manua

flume+kafka+storm整合實現實時計算小案例

    我們做資料分析的時候常常會遇到這樣兩個場景,一個是統計歷史資料,這個就是要分析歷史儲存的日誌。我們會使用hadoop,具體框架可以設計為:1.flume收集日誌;2.HDFS輸入路徑儲存日誌;3.MapReduce計算,將結果輸出到HDFS輸出路徑;4.hive+sq

flume+kafka+storm整合00

一、安裝 flume,kafka, storm 的安裝在下面三篇文章: flume:1.6.0 kafka:注意這裡最好下載scala2.10版本的kafka,因為scala2.10版本的相容性比較好和2.11版本差別太大 二、各個部分除錯

在windows10下搭建Storm執行WordCount詳解(單機版)!

釋出時間:2018-04-10 來源:網路 上傳者:使用者 關鍵字: 單機版 搭建 詳解 WordCount windows10 執行 Storm 發表文章 摘要:作為主流的大資料處理平臺,不同於批處理的Hadoop,Storm的實時計算以及拓撲結構有其獨有的優勢(當然,只是

storm 整合 kafka之保存MySQL數據庫

ons fin 整合 連接 shu date pri 對數 data 整合Kafka+Storm,消息通過各種方式進入到Kafka消息中間件,比如通過使用Flume來收集的日誌數據,然後暫由Kafka中的路由暫存,然後在由實時計算程序Storm做實時分析,這時候我們需要講S

Storm整合Kafka應用的開發

https://www.cnblogs.com/freeweb/p/5292961.html Storm整合Kafka應用的開發   我們知道storm的作用主要是進行流式計算,對於源源不斷的均勻資料流流入處理是非常有效的,而現實生活中大部分場景並不是均勻的資料流,而是時而多時而少的資料流入

kafkastorm整合

kafka與storm的整合步驟 採用官方storm-kafka-client方式進行整合 一 引入pom依賴 <properties> <project.build.sourceEncoding>UTF-8</

win10搭建執行kafka

前置條件: 想要在win執行kafka需要先配置好jdk喝zookeeper jdk 下載地址: https://www.oracle.com/technetwork/cn/java/javase/downloads/jdk8-downloads-2133151-zhs.html

Storm(七)Storm整合kafka

使用kafka-client jar進行Storm Apache Kafka整合 這包括新的Apache Kafka消費者API。相容性 Apache Kafka版本0.10起 引入jar包 &

【RFS】【robotframework_3】robotframework整合Jenkins執行郵件傳送執行結果

要求: 1.robotframework結合jenkins,實現整合構建 2.構建後能解析執行結果,並郵件傳送給特定人的郵箱 3.郵件內容可以自定義,做到美觀(自帶的郵件內容不美觀),下圖是自定義模版 我的前置條件: 1.Jenkins部署在Windows環

storm整合kafka舊版API(offset In Zk)示例

編寫主函式啟動類的Topo package com.simon.storm.kafka; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.kafka.

storm整合kafka新版API(offset In Kafka)示例

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&

storm整合kafka新版API(0.8版本之後)

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&l

flume+kafka+storm整合使用

Flume-ng Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。        不過這裡寫寫自己的見解 這個是flume的架構圖  從上圖可以看到幾個名詞: Agent: 一個Agent包含Source、Channel、Sink和其他的元件

支付寶整合過程詳解——執行DEMO

前言,夢想是需要堅持的,在路上,一路前行。加油。這兩天軟體需要整合支付寶了,第一次整合,過程還是挺簡單的,不過由於支付寶官方文件寫的不夠清晰,也是走了一些彎路,下面把過程寫出來分享給大家一、申請移動支付許可權首先登入【支付寶開放平臺】http://open.alipay.co

storm整合kafka,spout作為kafka的消費者

在之前的部落格中記錄,如何在專案storm中把每條記錄作為訊息傳送到kafka訊息佇列中的。這裡講述如何在storm中消費kafka佇列中的訊息。為何在專案中兩個拓撲檔案校驗和預處理之間要用kafka訊息佇列進行資料的暫存仍需要去落實。 專案中直接使用st

用maven將TestNG框架程式碼打成jar包執行測試-可持續整合

1.maven 依賴 TestNG: <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version&g