1. 程式人生 > >flume學習(六):使用hive來分析flume收集的日誌資料

flume學習(六):使用hive來分析flume收集的日誌資料

前面已經講過如何將log4j的日誌輸出到指定的hdfs目錄,我們前面的指定目錄為/flume/events。

如果想用hive來分析採集來的日誌,我們可以將/flume/events下面的日誌資料都load到hive中的表當中去。

如果瞭解hive的load data原理的話,還有一種更簡便的方式,可以省去load data這一步,就是直接將sink1.hdfs.path指定為hive表的目錄。

下面我將詳細描述具體的操作步驟。

我們還是從需求驅動來講解,前面我們採集的資料,都是介面的訪問日誌資料,資料格式是JSON格式如下:

{"requestTime":1405651379758,"requestParams":{"timestamp":1405651377211,"phone":"02038824941","cardName":"測試商家名稱","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}

現在有一個需求,我們要統計介面的總呼叫量。

我第一想法就是,hive中建一張表:test             然後將hdfs.path指定為tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test

然後select  count(*) from test;   完事。

這個方案簡單,粗暴,先這麼幹著。於是會遇到一個問題,我的日誌資料時JSON格式的,需要hive來序列化和反序列化JSON格式的資料到test表的具體欄位當中去。

這有點糟糕,因為hive本身沒有提供JSON的SERDE,但是有提供函式來解析JSON字串,

第一個是(UDF):

 get_json_object(string json_string,string path) 從給定路徑上的JSON字串中抽取出JSON物件,並返回這個物件的JSON字串形式,如果輸入的JSON字串是非法的,則返回NULL。

第二個是表生成函式(UDTF):json_tuple(string jsonstr,p1,p2,...,pn) 本函式可以接受多個標籤名稱,對輸入的JSON字串進行處理,這個和get_json_object這個UDF類似,不過更高效,其通過一次呼叫就可以獲得多個鍵值,例:select b.* from test_json a lateral view json_tuple(a.id,'id','name') b as f1,f2;通過lateral view行轉列。

最理想的方式就是能有一種JSON SERDE,只要我們LOAD完資料,就直接可以select * from test,而不是select get_json_object這種方式來獲取,N個欄位就要解析N次,效率太低了。

好在cloudrea wiki裡提供了一個json serde類(這個類沒有在發行的hive的jar包中),於是我把它搬來了,如下:

package com.besttone.hive.serde;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.codehaus.jackson.map.ObjectMapper;

/**
 * This SerDe can be used for processing JSON data in Hive. It supports
 * arbitrary JSON data, and can handle all Hive types except for UNION. However,
 * the JSON data is expected to be a series of discrete records, rather than a
 * JSON array of objects.
 * 
 * The Hive table is expected to contain columns with names corresponding to
 * fields in the JSON data, but it is not necessary for every JSON field to have
 * a corresponding Hive column. Those JSON fields will be ignored during
 * queries.
 * 
 * Example:
 * 
 * { "a": 1, "b": [ "str1", "str2" ], "c": { "field1": "val1" } }
 * 
 * Could correspond to a table:
 * 
 * CREATE TABLE foo (a INT, b ARRAY<STRING>, c STRUCT<field1:STRING>);
 * 
 * JSON objects can also interpreted as a Hive MAP type, so long as the keys and
 * values in the JSON object are all of the appropriate types. For example, in
 * the JSON above, another valid table declaraction would be:
 * 
 * CREATE TABLE foo (a INT, b ARRAY<STRING>, c MAP<STRING,STRING>);
 * 
 * Only STRING keys are supported for Hive MAPs.
 */
public class JSONSerDe implements SerDe {

	private StructTypeInfo rowTypeInfo;
	private ObjectInspector rowOI;
	private List<String> colNames;
	private List<Object> row = new ArrayList<Object>();

	//遇到非JSON格式輸入的時候的處理。
	private boolean ignoreInvalidInput;

	/**
	 * An initialization function used to gather information about the table.
	 * Typically, a SerDe implementation will be interested in the list of
	 * column names and their types. That information will be used to help
	 * perform actual serialization and deserialization of data.
	 */
	@Override
	public void initialize(Configuration conf, Properties tbl)
			throws SerDeException {
		// 遇到無法轉換成JSON物件的字串時,是否忽略,預設不忽略,丟擲異常,設定為true將跳過異常。
		ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(
				"input.invalid.ignore", "false"));

		// Get a list of the table's column names.

		String colNamesStr = tbl.getProperty(serdeConstants.LIST_COLUMNS);
		colNames = Arrays.asList(colNamesStr.split(","));

		// Get a list of TypeInfos for the columns. This list lines up with
		// the list of column names.
		String colTypesStr = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
		List<TypeInfo> colTypes = TypeInfoUtils
				.getTypeInfosFromTypeString(colTypesStr);

		rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(
				colNames, colTypes);
		rowOI = TypeInfoUtils
				.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
	}

	/**
	 * This method does the work of deserializing a record into Java objects
	 * that Hive can work with via the ObjectInspector interface. For this
	 * SerDe, the blob that is passed in is a JSON string, and the Jackson JSON
	 * parser is being used to translate the string into Java objects.
	 * 
	 * The JSON deserialization works by taking the column names in the Hive
	 * table, and looking up those fields in the parsed JSON object. If the
	 * value of the field is not a primitive, the object is parsed further.
	 */
	@Override
	public Object deserialize(Writable blob) throws SerDeException {
		Map<?, ?> root = null;
		row.clear();
		try {
			ObjectMapper mapper = new ObjectMapper();
			// This is really a Map<String, Object>. For more information about
			// how
			// Jackson parses JSON in this example, see
			// http://wiki.fasterxml.com/JacksonDataBinding
			root = mapper.readValue(blob.toString(), Map.class);
		} catch (Exception e) {
			// 如果為true,不丟擲異常,忽略該行資料
			if (!ignoreInvalidInput)
				throw new SerDeException(e);
			else {
				return null;
			}
			
		}

		// Lowercase the keys as expected by hive
		Map<String, Object> lowerRoot = new HashMap();
		for (Map.Entry entry : root.entrySet()) {
			lowerRoot.put(((String) entry.getKey()).toLowerCase(),
					entry.getValue());
		}
		root = lowerRoot;

		Object value = null;
		for (String fieldName : rowTypeInfo.getAllStructFieldNames()) {
			try {
				TypeInfo fieldTypeInfo = rowTypeInfo
						.getStructFieldTypeInfo(fieldName);
				value = parseField(root.get(fieldName), fieldTypeInfo);
			} catch (Exception e) {
				value = null;
			}
			row.add(value);
		}
		return row;
	}

	/**
	 * Parses a JSON object according to the Hive column's type.
	 * 
	 * @param field
	 *            - The JSON object to parse
	 * @param fieldTypeInfo
	 *            - Metadata about the Hive column
	 * @return - The parsed value of the field
	 */
	private Object parseField(Object field, TypeInfo fieldTypeInfo) {
		switch (fieldTypeInfo.getCategory()) {
		case PRIMITIVE:
			// Jackson will return the right thing in this case, so just return
			// the object
			if (field instanceof String) {
				field = field.toString().replaceAll("\n", "\\\\n");
			}
			return field;
		case LIST:
			return parseList(field, (ListTypeInfo) fieldTypeInfo);
		case MAP:
			return parseMap(field, (MapTypeInfo) fieldTypeInfo);
		case STRUCT:
			return parseStruct(field, (StructTypeInfo) fieldTypeInfo);
		case UNION:
			// Unsupported by JSON
		default:
			return null;
		}
	}

	/**
	 * Parses a JSON object and its fields. The Hive metadata is used to
	 * determine how to parse the object fields.
	 * 
	 * @param field
	 *            - The JSON object to parse
	 * @param fieldTypeInfo
	 *            - Metadata about the Hive column
	 * @return - A map representing the object and its fields
	 */
	private Object parseStruct(Object field, StructTypeInfo fieldTypeInfo) {
		Map<Object, Object> map = (Map<Object, Object>) field;
		ArrayList<TypeInfo> structTypes = fieldTypeInfo
				.getAllStructFieldTypeInfos();
		ArrayList<String> structNames = fieldTypeInfo.getAllStructFieldNames();

		List<Object> structRow = new ArrayList<Object>(structTypes.size());
		for (int i = 0; i < structNames.size(); i++) {
			structRow.add(parseField(map.get(structNames.get(i)),
					structTypes.get(i)));
		}
		return structRow;
	}

	/**
	 * Parse a JSON list and its elements. This uses the Hive metadata for the
	 * list elements to determine how to parse the elements.
	 * 
	 * @param field
	 *            - The JSON list to parse
	 * @param fieldTypeInfo
	 *            - Metadata about the Hive column
	 * @return - A list of the parsed elements
	 */
	private Object parseList(Object field, ListTypeInfo fieldTypeInfo) {
		ArrayList<Object> list = (ArrayList<Object>) field;
		TypeInfo elemTypeInfo = fieldTypeInfo.getListElementTypeInfo();

		for (int i = 0; i < list.size(); i++) {
			list.set(i, parseField(list.get(i), elemTypeInfo));
		}

		return list.toArray();
	}

	/**
	 * Parse a JSON object as a map. This uses the Hive metadata for the map
	 * values to determine how to parse the values. The map is assumed to have a
	 * string for a key.
	 * 
	 * @param field
	 *            - The JSON list to parse
	 * @param fieldTypeInfo
	 *            - Metadata about the Hive column
	 * @return
	 */
	private Object parseMap(Object field, MapTypeInfo fieldTypeInfo) {
		Map<Object, Object> map = (Map<Object, Object>) field;
		TypeInfo valueTypeInfo = fieldTypeInfo.getMapValueTypeInfo();

		for (Map.Entry<Object, Object> entry : map.entrySet()) {
			map.put(entry.getKey(), parseField(entry.getValue(), valueTypeInfo));
		}
		return map;
	}

	/**
	 * Return an ObjectInspector for the row of data
	 */
	@Override
	public ObjectInspector getObjectInspector() throws SerDeException {
		return rowOI;
	}

	/**
	 * Unimplemented
	 */
	@Override
	public SerDeStats getSerDeStats() {
		return null;
	}

	/**
	 * JSON is just a textual representation, so our serialized class is just
	 * Text.
	 */
	@Override
	public Class<? extends Writable> getSerializedClass() {
		return Text.class;
	}

	/**
	 * This method takes an object representing a row of data from Hive, and
	 * uses the ObjectInspector to get the data for each column and serialize
	 * it. This implementation deparses the row into an object that Jackson can
	 * easily serialize into a JSON blob.
	 */
	@Override
	public Writable serialize(Object obj, ObjectInspector oi)
			throws SerDeException {
		Object deparsedObj = deparseRow(obj, oi);
		ObjectMapper mapper = new ObjectMapper();
		try {
			// Let Jackson do the work of serializing the object
			return new Text(mapper.writeValueAsString(deparsedObj));
		} catch (Exception e) {
			throw new SerDeException(e);
		}
	}

	/**
	 * Deparse a Hive object into a Jackson-serializable object. This uses the
	 * ObjectInspector to extract the column data.
	 * 
	 * @param obj
	 *            - Hive object to deparse
	 * @param oi
	 *            - ObjectInspector for the object
	 * @return - A deparsed object
	 */
	private Object deparseObject(Object obj, ObjectInspector oi) {
		switch (oi.getCategory()) {
		case LIST:
			return deparseList(obj, (ListObjectInspector) oi);
		case MAP:
			return deparseMap(obj, (MapObjectInspector) oi);
		case PRIMITIVE:
			return deparsePrimitive(obj, (PrimitiveObjectInspector) oi);
		case STRUCT:
			return deparseStruct(obj, (StructObjectInspector) oi, false);
		case UNION:
			// Unsupported by JSON
		default:
			return null;
		}
	}

	/**
	 * Deparses a row of data. We have to treat this one differently from other
	 * structs, because the field names for the root object do not match the
	 * column names for the Hive table.
	 * 
	 * @param obj
	 *            - Object representing the top-level row
	 * @param structOI
	 *            - ObjectInspector for the row
	 * @return - A deparsed row of data
	 */
	private Object deparseRow(Object obj, ObjectInspector structOI) {
		return deparseStruct(obj, (StructObjectInspector) structOI, true);
	}

	/**
	 * Deparses struct data into a serializable JSON object.
	 * 
	 * @param obj
	 *            - Hive struct data
	 * @param structOI
	 *            - ObjectInspector for the struct
	 * @param isRow
	 *            - Whether or not this struct represents a top-level row
	 * @return - A deparsed struct
	 */
	private Object deparseStruct(Object obj, StructObjectInspector structOI,
			boolean isRow) {
		Map<Object, Object> struct = new HashMap<Object, Object>();
		List<? extends StructField> fields = structOI.getAllStructFieldRefs();
		for (int i = 0; i < fields.size(); i++) {
			StructField field = fields.get(i);
			// The top-level row object is treated slightly differently from
			// other
			// structs, because the field names for the row do not correctly
			// reflect
			// the Hive column names. For lower-level structs, we can get the
			// field
			// name from the associated StructField object.
			String fieldName = isRow ? colNames.get(i) : field.getFieldName();
			ObjectInspector fieldOI = field.getFieldObjectInspector();
			Object fieldObj = structOI.getStructFieldData(obj, field);
			struct.put(fieldName, deparseObject(fieldObj, fieldOI));
		}
		return struct;
	}

	/**
	 * Deparses a primitive type.
	 * 
	 * @param obj
	 *            - Hive object to deparse
	 * @param oi
	 *            - ObjectInspector for the object
	 * @return - A deparsed object
	 */
	private Object deparsePrimitive(Object obj, PrimitiveObjectInspector primOI) {
		return primOI.getPrimitiveJavaObject(obj);
	}

	private Object deparseMap(Object obj, MapObjectInspector mapOI) {
		Map<Object, Object> map = new HashMap<Object, Object>();
		ObjectInspector mapValOI = mapOI.getMapValueObjectInspector();
		Map<?, ?> fields = mapOI.getMap(obj);
		for (Map.Entry<?, ?> field : fields.entrySet()) {
			Object fieldName = field.getKey();
			Object fieldObj = field.getValue();
			map.put(fieldName, deparseObject(fieldObj, mapValOI));
		}
		return map;
	}

	/**
	 * Deparses a list and its elements.
	 * 
	 * @param obj
	 *            - Hive object to deparse
	 * @param oi
	 *            - ObjectInspector for the object
	 * @return - A deparsed object
	 */
	private Object deparseList(Object obj, ListObjectInspector listOI) {
		List<Object> list = new ArrayList<Object>();
		List<?> field = listOI.getList(obj);
		ObjectInspector elemOI = listOI.getListElementObjectInspector();
		for (Object elem : field) {
			list.add(deparseObject(elem, elemOI));
		}
		return list;
	}
}

我稍微修改了一點東西,多加了一個引數input.invalid.ignore,對應的變數為:

//遇到非JSON格式輸入的時候的處理。
private boolean ignoreInvalidInput;

在deserialize方法中原來是如果傳入的是非JSON格式字串的話,直接丟擲了SerDeException,我加了一個引數來控制它是否丟擲異常,在initialize方法中初始化這個變數(預設為false):

// 遇到無法轉換成JSON物件的字串時,是否忽略,預設不忽略,丟擲異常,設定為true將跳過異常。
ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(
"input.invalid.ignore", "false"));

好的,現在將這個類打成JAR包: JSONSerDe.jar,放在hive_home的auxlib目錄下(我的是/etc/hive/auxlib),然後修改hive-env.sh,新增HIVE_AUX_JARS_PATH=/etc/hive/auxlib/JSONSerDe.jar,這樣每次執行hive客戶端的時候都會將這個jar包新增到classpath,否則在設定SERDE的時候會報找不到類。

現在我們在HIVE中建立一張表用來存放日誌資料:

create table test(
requestTime BIGINT,
requestParams STRUCT<timestamp:BIGINT,phone:STRING,cardName:STRING,provinceCode:STRING,cityCode:STRING>,	
requestUrl STRING)
 row format serde "com.besttone.hive.serde.JSONSerDe" 
 WITH SERDEPROPERTIES(
 "input.invalid.ignore"="true",
 "requestTime"="$.requestTime",
 "requestParams.timestamp"="$.requestParams.timestamp",
 "requestParams.phone"="$.requestParams.phone",
 "requestParams.cardName"="$.requestParams.cardName",
 "requestParams.provinceCode"="$.requestParams.provinceCode",
 "requestParams.cityCode"="$.requestParams.cityCode",
 "requestUrl"="$.requestUrl");

這個表結構就是按照日誌格式設計的,還記得前面說過的日誌資料如下:

{"requestTime":1405651379758,"requestParams":{"timestamp":1405651377211,"phone":"02038824941","cardName":"測試商家名稱","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}

我使用了一個STRUCT型別來儲存requestParams的值,row format我們用的是自定義的json serde:com.besttone.hive.serde.JSONSerDe,SERDEPROPERTIES中,除了設定JSON物件的對映關係外,我還設定了一個自定義的引數:"input.invalid.ignore"="true",忽略掉所有非JSON格式的輸入行。這裡不是真正意義的忽略,只是非法行的每個輸出欄位都為NULL了,要在結果集上忽略,必須這樣寫:select * from test where requestUrl is not null;

OK表建好了,現在就差資料了,我們啟動flumedemo的WriteLog,往hive表test目錄下面輸出一些日誌資料,然後在進入hive客戶端,select * from test;所以欄位都正確的解析,大功告成。

flume.conf如下:

tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1

tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1

tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp

tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactionCapacity=1000
tier1.channels.channel1.keep-alive=30

tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test
tier1.sinks.sink1.hdfs.fileType=DataStream
tier1.sinks.sink1.hdfs.writeFormat=Text
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollSize=10240
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.idleTimeout=60

besttone.db是我在hive中建立的資料庫,瞭解hive的應該理解沒多大問題。

OK,到這篇文章為止,整個從LOG4J生產日誌,到flume收集日誌,再到用hive離線分析日誌,一整套流水線都講解完了。