從0到1搭建基於Kafka、Flume和Hive的海量資料分析系統(一)資料收集應用
大資料時代,一大技術特徵是對海量資料採集、儲存和分析的多元件解決方案。而其中對來自於感測器、APP的SDK和各類網際網路應用的原生日誌資料的採集儲存則是基本中的基本。本系列文章將從0到1,概述一下搭建基於Kafka、Flume、Zookeeper、HDFS、Hive的海量資料分析系統的框架、核心應用和關鍵模組。
專案原始碼儲存於GitHub:原始碼
系統架構概述
本系列文章所介紹的資料分析系統,定位於一種通用的大資料分析系統,可用於電商、網際網路和物聯網的實際解決方案中。該應用主要解決從多種多樣的網際網路應用、APP、感測器、小程式等網路客戶端中預設的介面採集資料,並進行分散式儲存,通過RESTful或服務訂閱的方式,連線BI應用或者嵌入了機器學習模組的業務資料分析系統。其專案架構如下:
專案主體實現了從各種網際網路客戶端的日誌資料到集中的BI分析系統的全過程,主要包括以下構件:
1. 日誌收集Web應用:基於REST風格的介面,處理從網路客戶端回傳的資料檔案,其中包括了對資料物件的定義、核心Web應用和模擬客戶端測試程式。
2. Kafka叢集:Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集來提供實時的訊息。
2. Zookeeper叢集:是一個為Kafka的分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。Zookeeper可以實現封裝好複雜易出錯的關鍵服務,將簡單易用的介面和效能高效、功能穩定的系統提供給使用者。
3. Flume:用於儲存資料到HDFS。Flume的意義在於:當收集資料的速度超過將寫入資料的時候,也就是當收集資訊遇到峰值時,這時候收集的資訊非常大,甚至超過了系統的寫入資料能力,這時候,Flume會在資料生產者和資料收容器間做出調整,保證其能夠在兩者之間提供平穩的資料。
4. HDFS:提供高吞吐量的分散式儲存方案。
5. Hive:Hive是建立在 Hadoop 上的資料倉庫基礎構架,定義了簡單的類 SQL 查詢語言,便於快速搭建基於SQL的資料應用。
6. Hive Server2:一種可選服務,允許遠端客戶端可以使用各種程式語言向Hive提交請求並檢索結果。
7. Dubbo和RPC:Dubbo是阿里開源的一個高效能優秀的服務框架,使得應用可通過高效能的 RPC 實現服務的輸出和輸入功能,輕鬆實現面向服務的應用開發。
資料收集應用
資料收集應用的目標是提供一個對外的介面,基於實時或準實時的要求收集來自海量客戶端應用所上傳的資料檔案,因此可以根據需求進行叢集化和新增負載均衡機制。以常規的日誌資料收集應用為例,一個數據應用應該實現的主要功能包括:資料屬性拷貝、資料物件封裝、時間校對、地理資料提取和快取、傳送資料至Kafka,以及一個可選的模擬客戶端上傳資料應用。
一、應用結構
基於Maven的多模組應用佈局方案,具體包括:
——EasyBI-Parent:父元件,僅維護一個pom檔案,作為個子元件的parent pom檔案,定義了統一的專案版本、依賴管理和Maven外掛管理。
|——EasyBI-Common:子元件,定義了日誌資料物件和通用的工具類方法。
|——EasyBI-Logs-Collect-Web:核心元件,基於Rest風格收集日誌資料,封裝資料物件併發送至Kafka,其中對一些資料進行初級加工。
|——EasyBI-Logs-MockApp:模擬一個客戶端上傳資料的應用,可選。
二、Common元件
資料物件
資料物件以日誌物件為載體,裡面封裝了從客戶端傳送過來的不同日誌的POJO物件,其類圖為:
AppBaseLog為日誌型別的統一父類,定義了一些公共的資料屬性,被用於各個具體日誌實現類繼承。
Startup、Event、Page、Usage和Error分別對應了應用啟動、事件、頁面、功能和錯誤的日誌記錄,繼承了公共基類並維護了各自的特有屬性。
APPLogEntity是按客戶端為單位的日誌物件,組合了各個不同的子日誌物件,作為整個資料分析系統的核心資料模型。
通用的工具類
主要包括兩個部分,分別是複製各子日誌物件的屬性至LogEntity物件的一個工具方法,以及一個提取IP位置資訊的工具方法。
拷貝日誌屬性的工具類,核心程式碼如下:
1 public class PropertiesUtil { 2 /* 3 * 通過內省進行屬性複製(物件到物件) 4 */ 5 public static void copyProperties(Object src, Object dest) { 6 7 try { 8 //源物件的BeanInfo 9 BeanInfo srcBeanInfo = Introspector.getBeanInfo(src.getClass()); 10 //獲取屬性描述符 11 PropertyDescriptor[] descriptors = srcBeanInfo.getPropertyDescriptors(); 12 for (PropertyDescriptor descriptor : descriptors) { 13 //獲取getter和setter方法 14 Method getter = descriptor.getReadMethod(); 15 Method setter = descriptor.getWriteMethod(); 16 //獲取set方法名稱 17 String setterName = setter.getName(); 18 //獲取setter方法引數 19 Class<?>[] parameterTypes = setter.getParameterTypes(); 20 21 Object value = getter.invoke(src); 22 23 try { 24 Method destSetter = dest.getClass().getMethod(setterName, parameterTypes); 25 destSetter.invoke(dest, value); 26 } catch (Exception e) { 27 continue; 28 } 29 30 } 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 /* 36 * 複製物件屬性至一個數組的過載方法 37 */ 38 public static void copyProperties(Object src, Object[] arr) { 39 for (Object obj : arr) { 40 copyProperties(src, obj); 41 } 42 } 43 }
該工具類包括兩個過載的方法,基於內省,分別實現深度複製一個物件的成員變數到另一個物件,或者到另一個物件陣列中。具體到本例子,包括:
- 獲取A物件的getter方法和setter方法,然後獲取setter方法的名稱和引數值。
- 通過反射,呼叫A物件的getter方法,獲取成員變數值。
- 通過反射,呼叫B物件的setter方法,為其賦值,完成複製。
提取IP地理資訊的工具類,通過使用maxmind-db庫,來實現對Host地址的地理資訊提取,用來填充至資料物件。
1 public static final int COUNTRY = 1; 2 public static final int PROVINCE = 2; 3 public static final int CITY = 3; 4 5 private static InputStream inputStream; 6 private static Reader reader; 7 8 static { 9 try { 10 inputStream = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb"); 11 reader = new Reader(inputStream); 12 } catch (IOException e) { 13 e.printStackTrace(); 14 } 15 }
使用maxmind-db庫需要用到GeoLite2-City.mmdb檔案,通過靜態程式碼塊來初始化資原始檔的讀取流,並且定義用來獲取國家、省、市的常量程式碼。
1 public static String getLocation(String ip, int level) { 2 try { 3 JsonNode node = reader.get(InetAddress.getByName(ip)); 4 switch (level) { 5 case 1: 6 return node.get("country").get("names").get("zh-CN").textValue(); 7 case 2: 8 return node.get("subdivisions").get(0).get("names").get("zh-CN").textValue(); 9 case 3: 10 return node.get("city").get("names").get("zh-CN").textValue(); 11 default: 12 return null; 13 } 14 } catch (Exception e) { 15 e.printStackTrace(); 16 } 17 return null; 18 }
通過呼叫com.maxmind.db.Reader物件的get方法,可以獲取傳入IP地址的地址節點物件,如果對文件節點模型比較熟悉的話,可以很快地獲取到節點物件所對應的不同地址資訊。
三、Logs_Collect_Web應用元件
Logs_Collect_Web是基於SpringMVC的Web應用,目標是收集各客戶端的日誌資料,元件結構為
DispatcherServlet是SpringMVC的核心排程類,關於SpringMVC的Web應用可參考:基於SSM的Java Web應用開發原理初探
其Controller類需要實現如下的核心功能。
基本資訊複製
利用上面介紹的工具類,實現對從請求體中所提取的日誌資料進行屬性複製,封裝到LogEntity的資料物件中,用於傳輸。
1 private void copyBaseProperties(AppLogEntity e) { 2 PropertiesUtil.copyProperties(e, e.getAppStartupLogs()); 3 4 }
時間校準
因為日誌檔案的上傳並不是瞬時的,客戶端提交時間與伺服器收到時間存在時間差,因此需要使用伺服器時間,與Http請求的時間差,來對原始的日誌檔案時間進行校正。
1 //server時間 2 long serverTime = System.currentTimeMillis(); 3 //client時間 4 long clientTime = Long.parseLong(request.getHeader("clientTime")); 5 //時間校對 6 long duration = serverTime - clientTime; 7 /* 8 * 校正時間 9 */ 10 private void verifyTime(AppLogEntity e, long duration) { 11 for (AppBaseLog log : e.getAppStartupLogs()) { 12 log.setCreatedAtMs(log.getCreatedAtMs() + duration); 13 } 14 }
提取地理資訊並快取
快取地理位置資訊的方法,是通過維護一個HashMap,把Host的字串作為鍵,封裝一個包括國家、省、市的位置物件作為值,實現賦值位置資訊到資料物件時:
- 如果快取中包含該位置,直接從HashMap中查詢該值並返回,實現高效能的查詢。
- 如果快取中沒有,再呼叫GeoUtil方法,獲取地址,並新增到HashMap中。
1 /* 2 * 操作IP的方法(快取地理位置資訊) 3 */ 4 private void processIP(AppLogEntity e, String clientIP) { 5 GeoInfo info = geoCache.get(clientIP); 6 if (info == null) { 7 info = new GeoInfo(); 8 info.setCountry(GeoUtil.getLocation(clientIP, GeoUtil.COUNTRY)); 9 info.setProvince(GeoUtil.getLocation(clientIP, GeoUtil.PROVINCE)); 10 geoCache.put(clientIP, info); 11 } 12 for (AppStartupLog log : e.getAppStartupLogs()) { 13 log.setCountry(info.getCountry()); 14 log.setProvince(info.getProvince()); 15 log.setIpAddress(clientIP); 16 } 17 }
傳送至Kafka的方法
Kafka的核心方法是Producer,通過將資料物件轉為JSON的字串封裝到不同的Topic中,再通過Producer來發送出去,即完成了傳送至Kafka的方法實現。程式碼實現如下:
1 private void sendMessage(AppLogEntity e) { 2 //建立配置物件 3 Properties properties = new Properties(); 4 properties.put("metadata.broker.list", "s202:9092"); 5 properties.put("serializer.class", "kafka.serializer.StringEncoder"); 6 properties.put("request.required.acks", "1"); 7 //建立生產者 8 Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(properties)); 9 10 sendSingleLog(producer, Constants.TOPIC_APP_STARTUP, e.getAppStartupLogs()); 11 sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs()); 12 //傳送訊息 13 producer.close(); 14 } 15 /* 16 * 傳送單個訊息的方法 17 */ 18 private void sendSingleLog(Producer<Integer, String> producer, String topic, 19 AppBaseLog[] logs) { 20 for (AppBaseLog log : logs) { 21 String logMessage = JSONObject.toJSONString(log); 22 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, logMessage); 23 producer.send(data); 24 } 25 }
四、Mock_Client應用元件
Mock_Client是可選元件,用於模擬一個客戶端向伺服器傳送帶資料物件的請求方法,來測試伺服器的可用性。
實現的原理是基於一個Json資料樣本,通過隨機組合資料物件的屬性併發送請求,並獲取響應程式碼來判斷。具體可以參考GitHub原始碼。