Hive 與 ElasticSearch 的資料互動
本文將詳細介紹利用 ES 與 Hive 直接的資料互動;通過 Hive 外部表的方式,可以快速將 ES 索引資料對映到 Hive 中,使用易於上手的 Hive SQL 實現對資料的進一步加工。
一、開發環境
1、元件版本
- CDH 叢集版本:6.0.1
- ES 版本:6.5.1
- Hive 版本:2.1.1
- ES-Hadoop 版本:6.5.1
2、Hive 簡介
Hive 在 Hadoop 生態系統中扮演著資料倉庫的角色,藉助 Hive 可以方便地進行資料彙總、即席查詢以及分析儲存在 Hadoop 檔案系統中的大型資料集。
Hive 通過類 SQL 語言(HSQL)對 Hadoop 上的資料進行抽象,這樣使用者可以通過 SQL 語句對資料進行定義、組織、操作和分析;在 Hive 中,資料集是通過表(定義了資料型別相關資訊)進行定義的,使用者可以通過內建運算子或使用者自定義函式(UDF)對資料進行載入、查詢和轉換。
3、Hive 安裝 ES-Hadoop
官方推薦的安裝方式:
使用 add jar
add jar /path/elasticsearch-hadoop.jar 複製程式碼
使用 hive.aux.jars.path
$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar 複製程式碼
修改配置( hive-site.xml
)
<property> <name>hive.aux.jars.path</name> <value>/path/elasticsearch-hadoop.jar</value> <description>A comma separated list (with no spaces) of the jar files</description> </property> 複製程式碼
CDH6.X 推薦的安裝方法
將 elasticsearch-hadoop.jar
複製到 Hive 的 auxlib 目錄中,然後重啟 Hive 即可。
cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/ 複製程式碼
二、Hive 與 ElasticSearch 的資料互動
1、資料型別對照表
請務必注意,ES 中的型別是 index/_mapping
中對應的資料型別,非 _source
裡面資料的型別。
Hive type | Elasticsearch type |
---|---|
void | null |
boolean | boolean |
tinyint | byte |
smallint | short |
int | int |
bigint | long |
double | double |
float | float |
string | string |
binary | binary |
timestamp | date |
struct | map |
map | map |
array | array |
union | not supported (yet) |
decimal | string |
date | date |
varchar | string |
char | string |
2、建立 Hive 外部表
CREATE EXTERNAL TABLE default.surface( water_type STRING, water_level STRING, monitor_time TIMESTAMP, sitecode STRING, p492 DOUBLE, p311 DOUBLE, status STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource'='ods_data_day_surface*/doc', 'es.query'='?q=status:001' 'es.nodes'='sky-01','es.port'='9200', 'es.net.http.auth.user'='sky', 'es.net.http.auth.pass'='jointsky', 'es.date.format'='yyyy-MM-dd HH:mm:ss', 'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader' 'es.mapping.names'='waterType:water_type,monitortime:monitor_time' ); 複製程式碼
3、配置項說明
es.resource
es.resource
用於設定 ES 資源的位置,預設該配置項同時設定了讀和寫的索引,當然也可以分別設定讀寫索引名稱:
es.resource.read es.resource.write
es.query
es.query
設定查詢過濾條件,目前支援 uri query
、 query dsl
、 external resource
三種設定方式。
# uri (or parameter) query es.query = ?q=costinl # query dsl es.query = { "query" : { "term" : { "user" : "costinl" } } } # external resource es.query = org/mypackage/myquery.json 複製程式碼
es.mapping.names
es.mapping.names
用於設定 Hive 與 ES 的欄位對映關係,如果不設定,則預設欄位名不發生變化(即為 data type 區域定義的欄位名);此外該部分還用於定義 Hive 到 ES 的資料對映型別。
'es.mapping.names' = 'date:@timestamp , url:url_123 ') 複製程式碼
其他通用欄位的說明請參考文章: 使用 ES-Hadoop 將 Spark Streaming 流資料寫入 ES
4、自定義日期型別解析
目前將 ES 的 date 型別對映到 Hive 的 TIMESTAMP 型別時,ES-Hadoop 元件只能識別時間戳格式或者標準的 XSD 格式的日期字串:
@Override protected Object parseDate(Long value, boolean richDate) { return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value)); } @Override protected Object parseDate(String value, boolean richDate) { return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value)); } 複製程式碼
關於 XSD(XML Schema Date/Time Datatypes)可用參考文章: www.w3schools.com/xml/schema_…
為了相容自定義的日期格式,需要編寫自定義的日期讀取類:
import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.hive.HiveValueReader; import java.sql.Timestamp; import java.text.ParseException; import java.text.ParsePosition; import java.text.SimpleDateFormat; import java.util.Date; public class EsValueReader extends HiveValueReader { private String dateFormat; private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm"; private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH"; private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd"; @Override public void setSettings(Settings settings) { super.setSettings(settings); dateFormat = settings.getProperty("es.date.format"); } @Override protected Object parseDate(String value, boolean richDate) { if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) { if (richDate){ if (value.length() == 16){ return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime())); } if (value.length() == 13){ return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime())); } if (value.length() == 10){ return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime())); } return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime())); } return parseString(value); } return super.parseDate(value, richDate); } /** * 解析日期,根據指定的格式進行解析.<br> * 如果解析錯誤,則返回null * @param stringDate 日期字串 * @param format 日期格式 * @return 日期型別 */ private static Date parseDate(String stringDate, String format) { if (stringDate == null) { return null; } try { return parseDate(stringDate, new String[] { format }); } catch (ParseException e) { return null; } } public static Date parseDate(String str, String... parsePatterns) throws ParseException { return parseDateWithLeniency(str, parsePatterns, true); } private static Date parseDateWithLeniency( String str, String[] parsePatterns, boolean lenient) throws ParseException { if (str == null || parsePatterns == null) { throw new IllegalArgumentException("Date and Patterns must not be null"); } SimpleDateFormat parser = new SimpleDateFormat(); parser.setLenient(lenient); ParsePosition pos = new ParsePosition(0); for (String parsePattern : parsePatterns) { String pattern = parsePattern; if (parsePattern.endsWith("ZZ")) { pattern = pattern.substring(0, pattern.length() - 1); } parser.applyPattern(pattern); pos.setIndex(0); String str2 = str; if (parsePattern.endsWith("ZZ")) { str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2"); } Date date = parser.parse(str2, pos); if (date != null && pos.getIndex() == str2.length()) { return date; } } throw new ParseException("Unable to parse the date: " + str, -1); } } 複製程式碼
上述程式碼的 Maven 依賴
<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.5.4</version> <scope>provided</scope> </dependency> </dependencies> 複製程式碼
自定義日期解析包的部署
程式碼編寫完成後,將程式碼進行打包,然後將打包好的 jar 包放置到 Hive 的 auxlib 目錄中,然後重啟 Hive 即可;該步驟與 ES-Hadoop 的安裝步驟一樣。
在編寫 Spark 程式從 Hive 中讀取資料的時候,需要新增對該包的依賴以及對 ES-Hadoop 的依賴。
三、總結
經過上述的步驟,Hive 與 ES 的對映已經不成問題,如果想從 ES 中匯出資料,可用藉助 HSQL insert into table XXX select * from XXXXX;
的方式從 ES 中讀取資料寫入到 HDFS;當然通過更為複雜的 HSQL 可以將資料進行處理,並將資料重新寫入到 ES 或者儲存到 HDFS。
充分利用 ES 的查詢、過濾和聚合,可以很好的去服務資料標準化、資料清洗、資料分佈情況等 ETL 流程。
Any Code,Code Any!
掃碼關注『AnyCode』,程式設計路上,一起前行。
