基於flume+kafka+storm日誌收集系統搭建
基於flume+kafka+storm日誌收集系統搭建
1. 環境
192.168.0.2 hadoop1
192.168.0.3 hadoop2
192.168.0.4 hadoop3
已經安裝了jdk1.7並配置了環境變數
2. 安裝版本
Strom:apache-strom-0.9.4.tar.gz
Kafka:kafka_2.11-0.8.2.0.tgz
Zookeeper: zookeeper-3.4.6.tar.gz
Kafka安裝目錄:/home/hadoop/kafka
strom安裝目錄:/home/hadoop/strom
flume安裝目錄:/home/hadoop/flume
zookeeper
單獨安裝這裡不再涉及,具體請參考本人之前的部落格
3. Flume與kafka整合
l Kafka安裝環境校驗
啟動kafka
cd /home/hadoop/kafka/bin
./kafka-server-start.sh ../config/server.properties &
在kafka上建立topic
./kafka-topics.sh --create --topic idoall_testTopic --replication-factor 4 --partitions 2 --zookeeper hadoop1:2181
在kafka上檢視topic
./kafka-topics.sh --list --zookeeper hadoop1:2181
在kafka刪除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic idoall –zookeeper hadoop2:2181
啟動zookeeper檢視topic是否存在
/home/hadoop/zookeeper/bin/zkCli.sh
ls /
/brokers
/brokers/topics
Kafka傳送訊息,在hadoop1上執行一下命令,輸入“hello kakfa”
./kafka-console-producer.sh --broker-list hadoop1:9092 --sync --topic test_topic
Kafka接收訊息,在hadoop2上執行一下命令,接收hadoop1發出的訊息
./bin/kafka-console-consumer.sh –zookeeper hadoop2:2181 --topic test_topic --from-beginning
如果hadoop2上成功接收到hadoop1的訊息則說明kafka環境是正常的。
l Flume配置
Flume與kafka整合實際上是將flume作為kafka的producer,傳送訊息給kafka
cd /home/hadoop/flume/conf
建立flume-kafka-conf.properties檔案
vi flume-kafka-conf.properties
輸入以下內容
#agent配置
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
#source配置, home/hadoop/flume/temp/log為監控目錄
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=home/hadoop/flume/temp/log
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader=false
#sink配置org.apache.kafka.skin.KafkaSink類需要自己編寫
agent1.sinks.sink1.type=org.apache.kafka.skin.KafkaSink
agent1.sinks.sink1.metadata.broker.list=hadoop1:9092,hadoop2:9092,hadoop3:9092
agent1.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.sink1.request.required.ack1=1
agent1.sinks.sink1.max.message.size=1000000
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.custom.topic.name=test_topic
#channel配置
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=1000
在eclipse中建立javaproject,引入/home/hadoop/flume/lib和/home/hadoop/kafka/libs下所有的jar包
編寫flume的KafkaSink類,內容如下:
1. import org.slf4j.Logger;
2. import org.slf4j.LoggerFactory;
3.
4. import java.util.Map;
5. import java.util.Properties;
6. import kafka.javaapi.producer.Producer;
7. import kafka.producer.KeyedMessage;
8. import kafka.producer.ProducerConfig;
9. import org.apache.flume.Context;
10. import org.apache.flume.Channel;
11. import org.apache.flume.Event;
12. import org.apache.flume.Transaction;
13. import org.apache.flume.conf.Configurable;
14. import org.apache.flume.sink.AbstractSink;
15. import com.google.common.base.Preconditions;
16. import com.google.common.collect.ImmutableMap;
17.
18. public class KafkaSink extends AbstractSink implements Configurable {
19.
20. private Context context;
21. private Properties parameters;
22. private Producer<String, String> producer;
23.
24. private static final String PARTITION_KEY_NAME = "custom.partition.key";
25. private static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
26. private static final String DEFAULT_ENCODING = "UTF-8";
27. private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
28.
29. public void configure(Context context) {
30. this.context = context;
31. ImmutableMap<String, String> props = context.getParameters();
32. this.parameters = new Properties();
33. for (Map.Entry<String,String> entry : props.entrySet()) {
34. this.parameters.put(entry.getKey(), entry.getValue());
35. }
36. }
37.
38. @Override
39. public synchronized void start() {
40. super.start();
41. ProducerConfig config = new ProducerConfig(this.parameters);
42. this.producer = new Producer<String, String>(config);
43. }
44.
45. public Status process() {
46. Status status = null;
47. Channel channel = getChannel();
48. Transaction transaction = channel.getTransaction();
49.
50. try {
51. transaction.begin();
52. Event event = channel.take();
53. if (event != null) {
54. String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);
55. String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),
56. "topic name is required");
57. String eventData = new String(event.getBody(), DEFAULT_ENCODING);
58. KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,
59. eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);
60. LOGGER.info("Sending Message to Kafka : [" + topic + ":" + eventData + "]");
61. producer.send(data);
62. transaction.commit();
63. LOGGER.info("Send message success");
64. status = Status.READY;
65. } else {
66. transaction.rollback();
67. status = Status.BACKOFF;
68. }
69. } catch (Exception e) {
70. e.printStackTrace();
71. LOGGER.info("Send message failed!");
72. transaction.rollback();
73. status = Status.BACKOFF;
74. } finally {
75. transaction.close();
76. }
77. return status;
78. }
79.
80. @Override
81. publicvoid stop() {
82. producer.close();
83. }
84. }
將該專案匯出為jar包:flume-kafka-plugin.jar。jar包名字隨意。
將flume-kafka-plugin.jar複製到/home/hadoop/flume/lib
並將kafka的相關jar包複製到/home/hadoop/flume/lib
cp /home/hadoop/kafka/libs/kafka_2.11-0.8.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
cp /home/hadoop/kafka/libs/scala-library-2.11.5.jar /home/hadoop/flume-1.5.0-bin/lib
cp /home/hadoop/kafka/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
cp /home/hadoop/kafka/libs/kafka-client-0.8.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
l flume與kafka整合驗證
在hadoop1上啟動flume
cd /home/hadoop/flume/bin
./flume-ng agent –c agent -n agent1 -f /home/hadoop/flume/conf/flume-kafka-conf.properties -Dflume.root.logger=INFO,console
在home/hadoop/flume/temp/log目錄下建立檔案並輸入內容
vi test.txt
123
456
在hadoop2啟動kafka的consumer,檢視是否可以接受到該資料
./bin/kafka-console-consumer.sh –zookeeper hadoop2:2181 --topic test_topic --from-beginning
此時如果看到console輸出hadoop1上的內容,表明flume和kafka整合成功
4. Kafka整合storm
在eclipse建立javaproject,並匯入/home/hadoop/kafka/libs和/home/hadoop/storm/lib下的所有jar包
#編寫KafkaSpout.java檔案
85. package kafka.with.storm;
86. import java.text.SimpleDateFormat;
87. import java.util.Date;
88. import java.util.HashMap;
89. import java.util.List;
90. import java.util.Map;
91. import java.util.Properties;
92. import kafka.consumer.ConsumerConfig;
93. import kafka.consumer.ConsumerIterator;
94. import kafka.consumer.KafkaStream;
95. import kafka.javaapi.consumer.ConsumerConnector;
96. import backtype.storm.spout.SpoutOutputCollector;
97. import backtype.storm.task.TopologyContext;
98. import backtype.storm.topology.IRichSpout;
99. import backtype.storm.topology.OutputFieldsDeclarer;
100. import backtype.storm.tuple.Fields;
101. import backtype.storm.tuple.Values;
102.
103. public class KafkaSpout implements IRichSpout {
104.
105. private SpoutOutputCollector collector;
106. private ConsumerConnector consumer;
107. private String topic;
108.
109. public KafkaSpout () {
110. }
111.
112. public KafkaSpout (String topic) {
113. this.topic = topic;
114. }
115.
116. public void nextTuple() {
117. }
118.
119. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
120. this.collector = collector;
121. }
122.
123. public void ack(Object msgId) {
124. }
125.
126. public void activate() {
127.
128. consumer=kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfi());
129. Map<String,Integer> topickMap = new HashMap<String, Integer>();
130. topickMap.put(topic, 1);
131.
132. System.out.println("*********Results********topic:"+topic);
133.
134. Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
135. KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
136. ConsumerIterator<byte[],byte[]> it =stream.iterator();
137. while(it.hasNext()){
138. String value =new String(it.next().message());
139. SimpleDateFormat formatter = new SimpleDateFormat ("yyyy年MM月dd日 HH:mm:ss SSS");
140. Date curDate = new Date(System.currentTimeMillis());//獲取當前時間
141. String str = formatter.format(curDate);
142.
143. System.out.println("storm接收到來自kafka的訊息------->" + value);
144.
145. collector.emit(new Values(value,1,str), value);
146. }
147. }
148.
149. private static ConsumerConfig createConsumerConfig() {
150. Properties props = new Properties();
151. // 設定zookeeper的連結地址
152. props.put("zookeeper.connect","hadoop1:2181,hadoop2:2181,hadoop3:2181");
153. // 設定group id
154. props.put("group.id", "1");
155. props.put("auto.commit.interval.ms", "1000000");
156. props.put(“suto.commit.enable”,”true”);
157. props.put("zookeeper.session.timeout.ms","1000000");
158. return new ConsumerConfig(props);
159. }
160.
161. public void close() {
162. }
163.
164. public void deactivate() {
165. }
166.
167. public void fail(Object msgId) {
168. }
169.
170. public void declareOutputFields(OutputFieldsDeclarer declarer) {
171. declarer.declare(new Fields("word","id","time"));
172. }
173.
174. public Map<String, Object> getComponentConfiguration() {
175. System.out.println("getComponentConfiguration被呼叫");
176. topic="idoall_testTopic";
177. return null;
178. }
179. }
#編寫KafkaTopology.java檔案
180. package kafka.with.storm;
181. import java.util.HashMap;
182. import java.util.Map;
183. import backtype.storm.Config;
184. import backtype.storm.LocalCluster;
185. import backtype.storm.topology.BasicOutputCollector;
186. import backtype.storm.topology.OutputFieldsDeclarer;
187. import backtype.storm.topology.TopologyBuilder;
188. import backtype.storm.topology.base.BaseBasicBolt;
189. import backtype.storm.tuple.Fields;
190. import backtype.storm.tuple.Tuple;
191. import backtype.storm.tuple.Values;
192. import backtype.storm.utils.Utils;
193.
194. public class KafkaTopology {
195.
196. public static void main(String[] args) {
197. TopologyBuilder builder = new TopologyBuilder();
198.
199. builder.setSpout("spout", new KafkaSpout(""), 1);
200. builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
201. builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));
202.
203. Map conf = new HashMap();
204. conf.put(Config.TOPOLOGY_WORKERS, 1);
205. conf.put(Config.TOPOLOGY_DEBUG, true);
206.
207. LocalCluster cluster = new LocalCluster();
208. cluster.submitTopology("flume-kafka-storm-topology-integration", conf, builder.createTopology());
209.
210. Utils.sleep(1000*60*5); // local cluster test ...
211. cluster.shutdown();
212. }
213.
214. public static class Bolt1 extends BaseBasicBolt {
215.
216. public void execute(Tuple input, BasicOutputCollector collector) {
217. try {
218. String msg = input.getString(0);
219. int id = input.getInteger(1);
220. String time = input.getString(2);
221. msg = msg+"bolt1";
222. System.out.println("對訊息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
223. if (msg != null) {
224. collector.emit(new Values(msg));
225. }
226. } catch (Exception e) {
227. e.printStackTrace();
228. }
229. }
230.
231.
232. public void declareOutputFields(OutputFieldsDeclarer declarer) {
233. declarer.declare(new Fields("word"));
234. }
235. }
236.
237. public static class Bolt2 extends BaseBasicBolt {
238. Map<String, Integer> counts = new HashMap<String, Integer>();
239.
240.
241. public void execute(Tuple tuple, BasicOutputCollector collector) {
242. String msg = tuple.getString(0);
243. msg = msg + "bolt2";
244. System.out.println("對訊息加工第2次---------->"+msg);
245. collector.emit(new Values(msg,1));
246. }
247.
248.
249. public void declareOutputFields(OutputFieldsDeclarer declarer) {
250. declarer.declare(new Fields("word", "count"));
251. }
基於flume+kafka+storm日誌收集系統搭建
1. 環境
192.168.0.2 hadoop1
192.168.0.3 hadoop2
192.168.0.4 hadoop3
已經
一、介紹
在日常運維工作中,對於系統和業務日誌的處理尤為重要。今天,在這裡分享一下自己部署的filebeat + kafka + ELK開源實時日誌分析平臺的記錄過程。
1、ELK介紹
&nbs
今天搭建了一下storm流處理系統,整個搭建的流程都是參考以下地址:http://www.cnblogs.com/web-v/articles/6510090.html
文章中並沒有給出flume同時寫入kafka和hdfs時的配置檔案。以下是我的flume配置檔案,有一些
技術交流群:59701880 深圳廣州hadoop好友會微信公眾號:後續部落格的文件都會轉到微信公眾號中。一直以來都想接觸Storm實時計算這塊的東西,最近在群裡看到上海一哥們羅寶寫的Flume+Kafka+Storm的實時日誌流系統的搭建文件,自己也跟著整了一遍,之前羅寶的
背景:
最近線上上了ELK,但是隻用了一臺Redis在中間作為訊息佇列,以減輕前端es叢集的壓力,Redis的叢集解決方案暫時沒有接觸過,並且Redis作為訊息佇列並不是它的強項;所以最近將Redis換成了專業的訊息資訊釋出訂閱系統Kafka, Kafka的更多介紹大家可以看這裡: 傳 Kafka broker修改conf/server.properties檔案,修改內容如下: broker.id=1 host.name=172.16.137.196 port=10985 log.dirs=/data/kafka could not arch success div 名稱 fill pil ice oca
ELK
ELK目前主流的一種日誌系統,過多的就不多介紹了
Filebeat收集日誌,將收集的日誌輸出到kafka,避免網絡問題丟失信息
kafka接收到日誌消息後直接消費到Lo
大資料系統中通常需要採集的日誌有:
系統訪問日誌
使用者點選日誌
其他業務日誌(比如推薦系統的點選日誌)
在收集日誌的時候,一般分為三層結構:採集層、彙總層和儲存層,而不是直接從採集端將資料傳送到儲存端,這樣的好處有:
如果儲存端如Hadoop叢集、Kafka等需要停
運用場景:我們機器上每天或者定期都要跑很多工,很多時候任務出現錯誤不能及時發現,導致發現的時候任務已經掛了很久了。
解決方法:基於 Flume+Kafka+Spark Streaming 的框架對這些任務的輸出日誌進行實時監控,當檢測到日誌出現Error的資訊就傳送郵件給
問題導讀:
1.Flume的存在些什麼問題?
2.基於開源的Flume美團增加了哪些功能?
3.Flume系統如何調優?
在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節
美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。
《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。
第一部
1、配置nginx.conf,新增以下配置
http {
#配置日誌格式
log_format lf '$remote_addr^A$msec^A$http_host^A$reques
原文
問題導讀:
1.Flume-NG與Scribe對比,Flume-NG的優勢在什麼地方?2.架構設計考慮需要考慮什麼問題?3.Agent宕機該如何解決?4.Collector宕機是否會有影響?5.Flume-NG可靠性(reliability)方面做了哪些措施?
美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。
《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。
第一部 with 指定 mwl 裏程碑 工程 生命 數據接收 dba -i 2017-09-06 朱潔 大數據和雲計算技術
任何一個生產系統在運行過程中都會產生大量的日誌,日誌往往隱藏了很多有價值的信息。在沒有分析方法之前,這些日誌存儲一段時間後就會被清理。隨著技術的發展和 註意 內存緩存 外部 ner 流動 場景 啟動 net conf Apache Flume概述
Flume 是 Cloudera 提供的一個高可用的,高可靠的,分布式的海量日誌采集、聚合和傳輸的系統。Flume 支持定制各類數據發送方,用於收集各類型數據;同時,Flu 大數據 實時計算 Storm [TOC]
1 大數據處理的常用方法
前面在我的另一篇文章中《大數據采集、清洗、處理:使用MapReduce進行離線數據分析完整案例》中已經有提及到,這裏依然給出下面的圖示:
前面給出的那篇文章是基於MapReduce的離線數據分析案例,其通過對網站產生的用戶訪問 uri 對數 exp 取數 網速 docker useradd 通過 演示 Centos7部署ELK日誌收集系統
一、ELK概述:
ELK是一組開源軟件的簡稱,其包括Elasticsearch、Logstash 和 Kibana。ELK最近幾年發展迅速,已經成為目前最流行的 agen debug 程序 負責 and 序列化 得到 集群 ava Flume
1. 前言
flume是由cloudera軟件公司產出的可分布式日誌收集系統,後與2009年被捐贈了apache軟件基金會,為hadoop相關組件之一。尤其近幾年隨著flume的不斷被完善
今天作者要在這裡通過一個簡單的電商網站訂單實時分析系統和大家一起梳理一下大資料環境下的實時分析系統的架構模型。當然這個架構模型只是實時分析技術的一 個簡單的入門級架構,實際生產環境中的大資料實時分析技術還涉及到很多細節的處理, 比如使用Storm的ACK機制保證資料都能被正確處理, 叢集的高可用架構 相關推薦
基於flume+kafka+storm日誌收集系統搭建
filebeat + kafka + logstash + Elasticsearch + Kibana日誌收集系統搭建
flume-ng+Kafka+Storm+HDFS 實時系統搭建
【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 實時系統搭建
ELK+kafka構建日誌收集系統
基於Flume+kafka打造實時日誌收集分析系統
Kafka+Zookeeper+Filebeat+ELK 搭建日誌收集系統
基於flume的日誌收集系統配置
基於 Flume+Kafka+Spark Streaming 實現實時監控輸出日誌的報警系統
基於Flume的美團日誌收集系統(二)改進和優化
基於Flume的美團日誌收集系統(一)架構和設計
nginx+flume+hdfs搭建實時日誌收集系統
10044---基於Flume的美團日誌收集系統(一)架構和設計
COPY 基於Flume的美團日誌收集系統架構和設計
Flume日誌收集系統架構詳解--轉
日誌收集系統Flume及其應用
Flume+Kafka+Storm+Redis構建大數據實時處理系統:實時統計網站PV、UV+展示
Linux搭建ELK日誌收集系統:FIlebeat+Redis+Logstash+Elasticse
Flume可分布式日誌收集系統
Flume+Kafka+Storm+Redis實時分析系統基本架構