1. 程式人生 > >實時日誌監控系統-全覽

實時日誌監控系統-全覽

大資料處理,大致可以分為兩大模組:

  1. 離線資料處理:比如說電商、運營商出現的大批量的日誌,可以由flume、sqoop或者其他路徑,匯入到HDFS中,然後經過資料清洗,使用Hive進行分析和處理,對於優化伺服器資源等有很好的作用;個人覺得,支付寶的年賬單就是離線資料處理的應用之處了。
  2. 實時資料處理:對於有些業務需要,可能第二天或者更晚的時候進行分析無關緊要,但對於一些高頻的金融交易來說,實時性就太重要了,還有一些如百度搜索的top10,新浪微博的微博熱點等等,如果等到第二天處理,那這些新聞也沒什麼吸引的價值了。

所以,縱觀來說,離線資料處理和實時資料處理撐起了大資料處理的一片天,本文將介紹本人親自負責並予以實施的日誌監控專案,麻雀雖小,五臟俱全。

主要模組

  1. 日誌收集模組
  2. 日誌處理模組

主要工具

  1. flume:用於日誌的收集,堪稱是業內最好的日誌收集工具,支援多種日誌收集的渠道,同時支援諸多的日誌收集存放地,功能強大;官方連結:flume官網
  2. kafka:訊息緩衝佇列,大資料處理中常用的緩衝佇列,用於資料爆炸的時候,避免拖垮後續的處理邏輯,將訊息先存放到佇列中,延遲一定的時間進行處理。
  3. log4j:我們在Tomcat伺服器上部署的業務系統,需要指定flume-appender,因此需要使用到log4j。
  4. SparkStreaming:在第一版本中,由於實時性不是很強,因此使用該工具予以處理,其處理日誌會有一定的延遲,但吞吐量較大。
  5. MySql:用於讀取配置資料,已經將配置資料全部遷移到zookeeper上。
  6. Spring boot:構建資料配置服務,方便使用者配置自己的日誌資料,比如郵件發給何人,簡訊發給何人,都可以自由指定。
  7. zookeeper:資料配置中心,在本專案用途中,主要是用於配置資料的管理,官方連結:zookeeper官網

1:日誌收集模組

在日誌收集模組中,針對我們自身的業務,可以分為兩大部分:

  1. Nginx日誌和資料庫執行日誌:首先是Nginx,作為業內比較強大的負責均衡工具,其效能比較優良,我們在日常的服務中,也是使用該工具來進行負載均衡的功能實現;插播一句,業內另一比較強大的負載均衡工具是淘寶的章文嵩博士開發的LVS,對於訪問量不是很大的網站,使用Nginx完全可以實現功能;為了能夠準確處理出錯的日誌,我們對日誌格式進行了一定的定義,類似下圖:
  2. 對於Tomcat型別的服務,選擇使用log4j內建的flume-appender方式來實現,具體配置可以參考官網:https://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAppender;其中有很詳細的flume-appender配置,在日誌中配置合理,每一條日誌都會按照相應的格式,作為flume收集日誌的來源。

對於收集到的日誌,統一採用kafkaSink的方式,輸送到後續的kafka中,以備後續的處理。

關於日誌的收集,在處理過程中有幾點收穫:

  1. 對於flume的收集渠道有了更加深入的理解,flume不愧是強大的工具,支援的收集渠道非常多,而且支援的型別也很多,我們在收集nginx日誌的時候,配置的type為exec,即命令執行方式,其會執行該命令,把需要監控的日誌實時進行讀取,配置如下:
    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /var/log/secure
    a1.sources.r1.channels = c1
  2. 對於tail命令,支援同時讀取多個日誌檔案,會統一把這些日誌輸送到同一個源,輸送到目的地。
  3. 攔截器的使用:有時候,收集到的日誌並不是完全如我們的意願,這時候,攔截器就派上了用場,我們在plugins.d目錄下,部署了自己的jar包,用於攔截讀取到的日誌,進行第二步驟的處理;而且攔截器支援鏈式,即多個攔截器會依次處理收集到的日誌。

2:日誌處理模組

對於收集到的日誌的處理,我們採用的是Spark-Streaming工具,將其與kafka對接,對於收集到的每一條資料進行處理:

public void startTask() {		
		//新建sparkConf
		SparkConf conf = new SparkConf().setAppName(ConfigUtils.SPARK_APPNAME);
		conf.setMaster("local[4]");// 本地多執行緒呼叫
		// conf.setMaster(ConfigUtils.SPARK_MASTER);//叢集呼叫
		//製作StreamingContext
		JavaStreamingContext jsc = new JavaStreamingContext(conf,
				Durations.seconds(Long.valueOf(ConfigUtils.SPARK_DURATIONS)));
		Map<String, String> kafaParameters = new HashMap<String, String>();
		//部署kafka機器的ip及埠號
		kafaParameters.put("metadata.broker.list", ConfigUtils.KAFKA_BROKER);
		//消費組的groupId
		kafaParameters.put("group.id", ConfigUtils.KAFKA_GROUPID);
		kafaParameters.put("fetch.message.max.bytes", ConfigUtils.KAFKA_FETCH_MAX);
		kafaParameters.put("num.consumer.fetchers", ConfigUtils.KAFKA_FETCH_NUM);
		Set<String> topics = new HashSet<String>();
		topics.add(ConfigUtils.KAFKA_TOPIC);
		try {
			//指定直連,消費kafka某個topic內的資料
			JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(jsc, String.class, String.class,
					StringDecoder.class, StringDecoder.class, kafaParameters, topics);
			JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
				public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {
					// log.info("接收kafka資料:" + tuple._2);
					return Arrays.asList(tuple._2.split(SPACE.pattern())).iterator();
				}
			});
			words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
				public void call(JavaRDD<String> word, Time arg1) throws Exception {
					// TODO Auto-generated method stub
					process(word);
				}
			});
		}catch(Exception e) {
			e.printStackTrace();
		}
	}

這裡,主要是將SparkStreaming與kafka對接起來的實現,需要指定消費組的group id,需要指定消費的topic,指定消費的機器,最重要的一步就是建立接下來需要進行處理的JavaRDD,其實,spark最核心的概念就是rdd的處理,其SparkStreaming,實際上處理的也就是一段時間內產生的RDD而已。

對於上述的程式碼中一些問題予以優化下:

try {
			JavaPairInputDStream<String, String> lines = KafkaUtils
					.createDirectStream(jsc, String.class, String.class,
							StringDecoder.class, StringDecoder.class,
							kafaParameters, topics);
			lines.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
				@Override
				public void call(JavaPairRDD<String, String> t)
						throws Exception {
					t.foreachPartition(new VoidFunction<Iterator<Tuple2<String, String>>>() {
						@Override
						public void call(Iterator<Tuple2<String, String>> t)
								throws Exception {
							while (t.hasNext()) {
								String res = t.next()._2;
								try {
									// 這裡,很重要的一點是,到底要不要輸出日誌
									if (flag) {
										log.info("read kafka message:" + res);
									}
									process(res);
								} catch (Exception e) {
									log.info(res + "------處理異常------"
											+ getExeptionMessage(e));
								}
							}
						}
					});
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}
更新了其中的運算元,爭取能夠提高效率:

接下來的處理,則是對收集到的日誌,進行自己的處理,在此處不予贅述。

專案總結:本專案其實難度並不大,重點在於攔截器的設定,kafka叢集的搭建,後續處理的完善,以及如何形成spark與kafka資料的對接等方面。