1. 程式人生 > >基於flume+kafka+storm日誌收集系統搭建

基於flume+kafka+storm日誌收集系統搭建

基於flume+kafka+storm日誌收集系統搭建

1.     環境

192.168.0.2 hadoop1

192.168.0.3 hadoop2

192.168.0.4 hadoop3

已經安裝了jdk1.7並配置了環境變數

2.     安裝版本

Stromapache-strom-0.9.4.tar.gz

Kafkakafka_2.11-0.8.2.0.tgz

Zookeeper: zookeeper-3.4.6.tar.gz

Kafka安裝目錄:/home/hadoop/kafka

strom安裝目錄:/home/hadoop/strom

flume安裝目錄:/home/hadoop/flume

zookeeper

安裝目錄:/home/hadoop/zookeeper

單獨安裝這裡不再涉及,具體請參考本人之前的部落格

3.     Flumekafka整合

Kafka安裝環境校驗

啟動kafka

cd  /home/hadoop/kafka/bin

./kafka-server-start.sh  ../config/server.properties &

kafka上建立topic

./kafka-topics.sh --create --topic idoall_testTopic --replication-factor 4 --partitions 2 --zookeeper hadoop1:2181

kafka上檢視topic

./kafka-topics.sh --list --zookeeper hadoop1:2181

kafka刪除topic

./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic idoall –zookeeper hadoop2:2181 

啟動zookeeper檢視topic是否存在

/home/hadoop/zookeeper/bin/zkCli.sh

ls /

/brokers

/brokers/topics

Kafka傳送訊息,在hadoop1上執行一下命令,輸入“hello kakfa

./kafka-console-producer.sh --broker-list hadoop1:9092 --sync --topic test_topic

Kafka接收訊息,在hadoop2上執行一下命令,接收hadoop1發出的訊息

./bin/kafka-console-consumer.sh –zookeeper hadoop2:2181 --topic test_topic --from-beginning

如果hadoop2上成功接收到hadoop1的訊息則說明kafka環境是正常的。

Flume配置

Flumekafka整合實際上是將flume作為kafkaproducer,傳送訊息給kafka

cd /home/hadoop/flume/conf

建立flume-kafka-conf.properties檔案

vi flume-kafka-conf.properties

輸入以下內容

#agent配置

agent1.sources=source1

agent1.sinks=sink1

agent1.channels=channel1

#source配置, home/hadoop/flume/temp/log為監控目錄

agent1.sources.source1.type=spooldir

agent1.sources.source1.spoolDir=home/hadoop/flume/temp/log

agent1.sources.source1.channels=channel1

agent1.sources.source1.fileHeader=false

#sink配置org.apache.kafka.skin.KafkaSink類需要自己編寫

agent1.sinks.sink1.type=org.apache.kafka.skin.KafkaSink

agent1.sinks.sink1.metadata.broker.list=hadoop1:9092,hadoop2:9092,hadoop3:9092

agent1.sinks.sink1.serializer.class=kafka.serializer.StringEncoder

agent1.sinks.sink1.request.required.ack1=1

agent1.sinks.sink1.max.message.size=1000000

agent1.sinks.sink1.channel=channel1

agent1.sinks.sink1.custom.topic.name=test_topic

#channel配置

agent1.channels.channel1.type=memory

agent1.channels.channel1.capacity=1000

eclipse中建立javaproject,引入/home/hadoop/flume/lib/home/hadoop/kafka/libs下所有的jar

編寫flumeKafkaSink類,內容如下:

1.      import org.slf4j.Logger;  

2.      import org.slf4j.LoggerFactory;  

3.      

4.      import java.util.Map;  

5.      import java.util.Properties;  

6.      import kafka.javaapi.producer.Producer;  

7.      import kafka.producer.KeyedMessage;  

8.      import kafka.producer.ProducerConfig;  

9.      import org.apache.flume.Context;  

10.  import org.apache.flume.Channel;  

11.  import org.apache.flume.Event;  

12.  import org.apache.flume.Transaction;  

13.  import org.apache.flume.conf.Configurable;  

14.  import org.apache.flume.sink.AbstractSink;  

15.  import com.google.common.base.Preconditions;  

16.  import com.google.common.collect.ImmutableMap;  

17.  

18.  public class KafkaSink extends AbstractSink implements Configurable {  

19.  

20.      private Context context;  

21.      private Properties parameters;  

22.      private Producer<String, String> producer;  

23.  

24.      private static final String PARTITION_KEY_NAME = "custom.partition.key";  

25.      private static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";  

26.      private static final String DEFAULT_ENCODING = "UTF-8";  

27.      private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);  

28.  

29.      public void configure(Context context) {  

30.          this.context = context;  

31.          ImmutableMap<String, String> props = context.getParameters();  

32.          this.parameters = new Properties();  

33.          for (Map.Entry<String,String> entry : props.entrySet()) {  

34.              this.parameters.put(entry.getKey(), entry.getValue());  

35.          }  

36.      }  

37.  

38.      @Override  

39.      public synchronized void start() {  

40.          super.start();  

41.          ProducerConfig config = new ProducerConfig(this.parameters);  

42.          this.producer = new Producer<String, String>(config);  

43.      }  

44.  

45.      public Status process() {  

46.          Status status = null;  

47.          Channel channel = getChannel();  

48.          Transaction transaction = channel.getTransaction();  

49.  

50.          try {  

51.              transaction.begin();  

52.              Event event = channel.take();  

53.              if (event != null) {  

54.                  String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);  

55.                  String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),  

56.                          "topic name is required");  

57.                  String eventData = new String(event.getBody(), DEFAULT_ENCODING);  

58.                  KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,  

59.                          eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);  

60.                  LOGGER.info("Sending Message to Kafka : [" + topic + ":" + eventData + "]");  

61.                  producer.send(data);  

62.                  transaction.commit();  

63.                  LOGGER.info("Send message success");  

64.                  status = Status.READY;  

65.              } else {  

66.                  transaction.rollback();  

67.                  status = Status.BACKOFF;  

68.              }  

69.          } catch (Exception e) {  

70.              e.printStackTrace();  

71.              LOGGER.info("Send message failed!");  

72.              transaction.rollback();  

73.              status = Status.BACKOFF;  

74.          } finally {  

75.              transaction.close();  

76.          }  

77.          return status;  

78.      }  

79.  

80.      @Override  

81. publicvoid stop() {  

82.         producer.close();  

83.     }  

84. }  

將該專案匯出為jar包:flume-kafka-plugin.jarjar包名字隨意。

flume-kafka-plugin.jar複製到/home/hadoop/flume/lib

並將kafka的相關jar包複製到/home/hadoop/flume/lib

cp /home/hadoop/kafka/libs/kafka_2.11-0.8.2.0.jar /home/hadoop/flume-1.5.0-bin/lib

cp /home/hadoop/kafka/libs/scala-library-2.11.5.jar /home/hadoop/flume-1.5.0-bin/lib

cp /home/hadoop/kafka/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib

cp /home/hadoop/kafka/libs/kafka-client-0.8.2.0.jar /home/hadoop/flume-1.5.0-bin/lib

flumekafka整合驗證

hadoop1上啟動flume

cd /home/hadoop/flume/bin

./flume-ng agent –c agent -n agent1 -f /home/hadoop/flume/conf/flume-kafka-conf.properties -Dflume.root.logger=INFO,console

home/hadoop/flume/temp/log目錄下建立檔案並輸入內容

vi test.txt

123

456

hadoop2啟動kafkaconsumer,檢視是否可以接受到該資料

./bin/kafka-console-consumer.sh –zookeeper hadoop2:2181 --topic test_topic --from-beginning

此時如果看到console輸出hadoop1上的內容,表明flumekafka整合成功

4.     Kafka整合storm

eclipse建立javaproject,並匯入/home/hadoop/kafka/libs/home/hadoop/storm/lib下的所有jar

 #編寫KafkaSpout.java檔案

85.  package kafka.with.storm;

86.  import java.text.SimpleDateFormat;

87.  import java.util.Date;

88.  import java.util.HashMap;

89.  import java.util.List;

90.  import java.util.Map;

91.  import java.util.Properties;

92.  import kafka.consumer.ConsumerConfig;

93.  import kafka.consumer.ConsumerIterator;

94.  import kafka.consumer.KafkaStream;

95.  import kafka.javaapi.consumer.ConsumerConnector;

96.  import backtype.storm.spout.SpoutOutputCollector;

97.  import backtype.storm.task.TopologyContext;

98.  import backtype.storm.topology.IRichSpout;

99.  import backtype.storm.topology.OutputFieldsDeclarer;

100.        import backtype.storm.tuple.Fields;

101.        import backtype.storm.tuple.Values;

102.        

103.        public class KafkaSpout implements IRichSpout {

104.        

105.            private SpoutOutputCollector collector;

106.            private ConsumerConnector consumer;

107.            private String topic;

108.        

109.            public KafkaSpout () {

110.            }

111.        

112.            public KafkaSpout (String topic) {

113.                this.topic = topic;

114.            }

115.        

116.            public void nextTuple() {

117.            }

118.        

119.            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

120.                this.collector = collector;

121.            }

122.        

123.            public void ack(Object msgId) {

124.            }

125.        

126.            public void activate() {

127.        

128.        consumer=kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfi());  

129.                 Map<String,Integer> topickMap = new HashMap<String, Integer>();  

130.                topickMap.put(topic, 1);  

131.        

132.                System.out.println("*********Results********topic:"+topic);  

133.        

134.                Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  

135.                KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  

136.                ConsumerIterator<byte[],byte[]> it =stream.iterator();   

137.                while(it.hasNext()){  

138.                     String value =new String(it.next().message());

139.                     SimpleDateFormat formatter = new SimpleDateFormat   ("yyyyMMdd HH:mm:ss SSS");  

140.                     Date curDate = new Date(System.currentTimeMillis());//獲取當前時間

141.                     String str = formatter.format(curDate);   

142.        

143.                     System.out.println("storm接收到來自kafka的訊息------->" + value);

144.        

145.                     collector.emit(new Values(value,1,str), value);

146.                }  

147.            }

148.        

149.            private static ConsumerConfig createConsumerConfig() {  

150.                Properties props = new Properties();  

151.                // 設定zookeeper的連結地址

152.                props.put("zookeeper.connect","hadoop1:2181,hadoop2:2181,hadoop3:2181");  

153.                // 設定group id

154.                props.put("group.id", "1");  

155.                props.put("auto.commit.interval.ms", "1000000");

156.                           props.put(“suto.commit.enable”,”true”);

157.                props.put("zookeeper.session.timeout.ms","1000000");  

158.                return new ConsumerConfig(props);  

159.            }  

160.        

161.            public void close() {

162.            }

163.        

164.            public void deactivate() {

165.            }

166.        

167.            public void fail(Object msgId) {

168.            }

169.        

170.            public void declareOutputFields(OutputFieldsDeclarer declarer) {

171.                declarer.declare(new Fields("word","id","time"));

172.            }

173.        

174.            public Map<String, Object> getComponentConfiguration() {

175.                System.out.println("getComponentConfiguration被呼叫");

176.                topic="idoall_testTopic";

177.                return null;

178.            }

179.        }


#編寫KafkaTopology.java檔案

180.        package kafka.with.storm;

181.        import java.util.HashMap;

182.        import java.util.Map;

183.        import backtype.storm.Config;

184.        import backtype.storm.LocalCluster;

185.        import backtype.storm.topology.BasicOutputCollector;

186.        import backtype.storm.topology.OutputFieldsDeclarer;

187.        import backtype.storm.topology.TopologyBuilder;

188.        import backtype.storm.topology.base.BaseBasicBolt;

189.        import backtype.storm.tuple.Fields;

190.        import backtype.storm.tuple.Tuple;

191.        import backtype.storm.tuple.Values;

192.        import backtype.storm.utils.Utils;

193.        

194.        public class KafkaTopology {

195.        

196.            public static void main(String[] args) {

197.                TopologyBuilder builder = new TopologyBuilder();

198.        

199.                builder.setSpout("spout", new KafkaSpout(""), 1);

200.                builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");

201.                builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));

202.        

203.                Map conf = new HashMap();

204.                conf.put(Config.TOPOLOGY_WORKERS, 1);

205.                conf.put(Config.TOPOLOGY_DEBUG, true);

206.        

207.                LocalCluster cluster = new LocalCluster();

208.                cluster.submitTopology("flume-kafka-storm-topology-integration", conf, builder.createTopology());

209.        

210.                Utils.sleep(1000*60*5); // local cluster test ...

211.                cluster.shutdown();

212.            }

213.        

214.            public static class Bolt1 extends BaseBasicBolt {

215.        

216.                public void execute(Tuple input, BasicOutputCollector collector) {

217.                    try {

218.                        String msg = input.getString(0);

219.                        int id = input.getInteger(1);

220.                        String time = input.getString(2);

221.                        msg = msg+"bolt1";

222.                        System.out.println("對訊息加工第1-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);

223.                        if (msg != null) {

224.                            collector.emit(new Values(msg));

225.                        }

226.                    } catch (Exception e) {

227.                        e.printStackTrace();

228.                    }

229.                }

230.        

231.        

232.                public void declareOutputFields(OutputFieldsDeclarer declarer) {

233.                    declarer.declare(new Fields("word"));

234.                }

235.            }

236.        

237.            public static class Bolt2 extends BaseBasicBolt {

238.                Map<String, Integer> counts = new HashMap<String, Integer>();

239.        

240.        

241.                public void execute(Tuple tuple, BasicOutputCollector collector) {

242.                    String msg = tuple.getString(0);

243.                    msg = msg + "bolt2";

244.                    System.out.println("對訊息加工第2---------->"+msg);

245.                    collector.emit(new Values(msg,1));

246.                }

247.        

248.        

249.                public void declareOutputFields(OutputFieldsDeclarer declarer) {

250.                    declarer.declare(new Fields("word", "count"));

251.                }

相關推薦

基於flume+kafka+storm日誌收集系統搭建

基於flume+kafka+storm日誌收集系統搭建 1.     環境 192.168.0.2 hadoop1 192.168.0.3 hadoop2 192.168.0.4 hadoop3 已經

filebeat + kafka + logstash + Elasticsearch + Kibana日誌收集系統搭建

一、介紹        在日常運維工作中,對於系統和業務日誌的處理尤為重要。今天,在這裡分享一下自己部署的filebeat + kafka + ELK開源實時日誌分析平臺的記錄過程。 1、ELK介紹      &nbs

flume-ng+Kafka+Storm+HDFS 實時系統搭建

今天搭建了一下storm流處理系統,整個搭建的流程都是參考以下地址:http://www.cnblogs.com/web-v/articles/6510090.html 文章中並沒有給出flume同時寫入kafka和hdfs時的配置檔案。以下是我的flume配置檔案,有一些

【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 實時系統搭建

技術交流群:59701880 深圳廣州hadoop好友會微信公眾號:後續部落格的文件都會轉到微信公眾號中。一直以來都想接觸Storm實時計算這塊的東西,最近在群裡看到上海一哥們羅寶寫的Flume+Kafka+Storm的實時日誌流系統的搭建文件,自己也跟著整了一遍,之前羅寶的

ELK+kafka構建日誌收集系統

背景: 最近線上上了ELK,但是隻用了一臺Redis在中間作為訊息佇列,以減輕前端es叢集的壓力,Redis的叢集解決方案暫時沒有接觸過,並且Redis作為訊息佇列並不是它的強項;所以最近將Redis換成了專業的訊息資訊釋出訂閱系統Kafka, Kafka的更多介紹大家可以看這裡: 傳

基於Flume+kafka打造實時日誌收集分析系統

Kafka broker修改conf/server.properties檔案,修改內容如下:           broker.id=1           host.name=172.16.137.196 port=10985           log.dirs=/data/kafka

Kafka+Zookeeper+Filebeat+ELK 搭建日誌收集系統

could not arch success div 名稱 fill pil ice oca ELK ELK目前主流的一種日誌系統,過多的就不多介紹了 Filebeat收集日誌,將收集的日誌輸出到kafka,避免網絡問題丟失信息 kafka接收到日誌消息後直接消費到Lo

基於flume日誌收集系統配置

大資料系統中通常需要採集的日誌有: 系統訪問日誌 使用者點選日誌 其他業務日誌(比如推薦系統的點選日誌) 在收集日誌的時候,一般分為三層結構:採集層、彙總層和儲存層,而不是直接從採集端將資料傳送到儲存端,這樣的好處有: 如果儲存端如Hadoop叢集、Kafka等需要停

基於 Flume+Kafka+Spark Streaming 實現實時監控輸出日誌的報警系統

運用場景:我們機器上每天或者定期都要跑很多工,很多時候任務出現錯誤不能及時發現,導致發現的時候任務已經掛了很久了。  解決方法:基於 Flume+Kafka+Spark Streaming 的框架對這些任務的輸出日誌進行實時監控,當檢測到日誌出現Error的資訊就傳送郵件給

基於Flume的美團日誌收集系統(二)改進和優化

問題導讀: 1.Flume的存在些什麼問題? 2.基於開源的Flume美團增加了哪些功能? 3.Flume系統如何調優? 在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節

基於Flume的美團日誌收集系統(一)架構和設計

美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部

nginx+flume+hdfs搭建實時日誌收集系統

1、配置nginx.conf,新增以下配置 http { #配置日誌格式 log_format lf '$remote_addr^A$msec^A$http_host^A$reques

10044---基於Flume的美團日誌收集系統(一)架構和設計

原文 問題導讀: 1.Flume-NG與Scribe對比,Flume-NG的優勢在什麼地方?2.架構設計考慮需要考慮什麼問題?3.Agent宕機該如何解決?4.Collector宕機是否會有影響?5.Flume-NG可靠性(reliability)方面做了哪些措施?  

COPY 基於Flume的美團日誌收集系統架構和設計

美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部

Flume日誌收集系統架構詳解--轉

with 指定 mwl 裏程碑 工程 生命 數據接收 dba -i 2017-09-06 朱潔 大數據和雲計算技術 任何一個生產系統在運行過程中都會產生大量的日誌,日誌往往隱藏了很多有價值的信息。在沒有分析方法之前,這些日誌存儲一段時間後就會被清理。隨著技術的發展和

日誌收集系統Flume及其應用

註意 內存緩存 外部 ner 流動 場景 啟動 net conf Apache Flume概述   Flume 是 Cloudera 提供的一個高可用的,高可靠的,分布式的海量日誌采集、聚合和傳輸的系統。Flume 支持定制各類數據發送方,用於收集各類型數據;同時,Flu

Flume+Kafka+Storm+Redis構建大數據實時處理系統:實時統計網站PV、UV+展示

大數據 實時計算 Storm [TOC] 1 大數據處理的常用方法 前面在我的另一篇文章中《大數據采集、清洗、處理:使用MapReduce進行離線數據分析完整案例》中已經有提及到,這裏依然給出下面的圖示: 前面給出的那篇文章是基於MapReduce的離線數據分析案例,其通過對網站產生的用戶訪問

Linux搭建ELK日誌收集系統:FIlebeat+Redis+Logstash+Elasticse

uri 對數 exp 取數 網速 docker useradd 通過 演示 Centos7部署ELK日誌收集系統 一、ELK概述: ELK是一組開源軟件的簡稱,其包括Elasticsearch、Logstash 和 Kibana。ELK最近幾年發展迅速,已經成為目前最流行的

Flume可分布式日誌收集系統

agen debug 程序 負責 and 序列化 得到 集群 ava Flume 1. 前言   flume是由cloudera軟件公司產出的可分布式日誌收集系統,後與2009年被捐贈了apache軟件基金會,為hadoop相關組件之一。尤其近幾年隨著flume的不斷被完善

Flume+Kafka+Storm+Redis實時分析系統基本架構

今天作者要在這裡通過一個簡單的電商網站訂單實時分析系統和大家一起梳理一下大資料環境下的實時分析系統的架構模型。當然這個架構模型只是實時分析技術的一 個簡單的入門級架構,實際生產環境中的大資料實時分析技術還涉及到很多細節的處理, 比如使用Storm的ACK機制保證資料都能被正確處理, 叢集的高可用架構