1. 程式人生 > >hive與es之間實現資料互動

hive與es之間實現資料互動

1、環境描述:

hadoop叢集環境:hadoop-2.6.0;3臺叢集環境

hbase叢集環境:habase-1.1.2 ;3臺叢集環境

hive環境:hive-1.2.1;1臺測試環境

elasticsearch:elasticsearch-1.7.1測試環境

2、下載hive與es之間資料互動的外掛。

說明:如果用ElasticSearch版本為2.1.0,必須使用elasticsearch-hadoop-2.2.0才能支援,如果ES版本低於2.1.0,可以使用elasticsearch-hadoop-2.1.2,在本次實驗中我選擇的是elasticsearch-hadoop-2.1.2。

3、註冊elasticsearch-hadoop-2.1.2:

hive> add jar file:///home/hadoop/xuguokun/elasticsearch-hadoop-2.2.0.jar;
4、在es端建立表索引建立
package com.test.file2es;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Date;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import com.test.utils.DateUtils;

public class ReadFile2Es1 {

	public static void main(String[] args) {

		String filePath = "E:\\大資料相關資料\\TestData\\ESData";//這裡替換成205伺服器上測試資料的存放根目錄
		
		@SuppressWarnings("resource")
		Client client = new TransportClient().addTransportAddresses(new InetSocketTransportAddress("192.168.174.130", 9300));
		
		try{
			
			bianli(filePath,client);
			System.out.println("遍歷結束");
			
		}catch(Exception e){
		
			e.printStackTrace();
			System.out.println("error");
		}		
	}

	public static void bianli(String path,Client client) {

		File file = new File(path);
		
		if (file.exists()) {
			
			File[] files = file.listFiles();
			
			if (files.length == 0) {
				
				System.out.println("資料夾是空的!");
				return;
			
			} else {
				
				for (File file2 : files) {
					
					if (file2.isDirectory()) {
						
						System.out.println("資料夾:" + file2.getAbsolutePath());
						bianli(file2.getAbsolutePath(),client);
					
					} else {
						
						FileReader fr;
						
						try {
							
							fr = new FileReader(file2.getAbsolutePath());
							BufferedReader br = new BufferedReader(fr);
							String line = "";		
							
							IndexResponse indexResponse = null;
							
							while ((line = br.readLine()) != null) {								
								
								if(line!=null&&!line.equals("")){
									
									String[] str = line.split(",");
									
										
									indexResponse = client.prepareIndex("hive2es", "info")
										        .setSource(jsonBuilder()
										                    .startObject()
									                        .field("area", str[0])
										                    .field("media_view_tags",str[1])
										                    .field("interest",str[2])										         
										                    .endObject())
										       .execute()
										       .actionGet();
									}																																													
							}
						
							br.close();
							fr.close();
						
						} catch (FileNotFoundException e) {
						
							e.printStackTrace();
							
						} catch (IOException e) {
						
							e.printStackTrace();			
						}
						//System.out.println("檔案:" + file2.getAbsolutePath());
					}
				}
			}
		} else {
			System.out.println("檔案不存在!");
		}
	}
	
    public static String getRealData(String str[]){
	    		    	
    	  if (str.length == 38) {
  			
  			String realData = "";
  			
  			
  			for(int i = 0; i < 38;i++){
  				  				
  				if((str[i].substring(1, str[i].length() - 1).equals("null"))||(str[i].substring(1, str[i].length() - 1)).equals("")){
  					
  					realData = realData + "null";
  					
  				}
  				else{
  					
  					realData =  realData + str[i].substring(1, str[i].length() - 1);
  				}
  				
  				if(i!=37){
  				
  					realData = realData + ",";
  				}
  				
  			}
  	
  			return realData;
  			
  		}
  		else {
  			
  			return "格式不正確";
  		}
	    	
	  }
}

注意:當前步驟es讀取的本地測試資料如下所示:

beijing,diannaokeji,kejiwang

5、在hive端建立表hive_es

CREATE EXTERNAL TABLE hive_es (cookieid string,area string,media_view_tags string,interest string )STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.nodes' = '192.168.174.130:9200','es.index.auto.create' = 'false','es.resource' = 'hive2es/info','es.read.metadata' = 'true','es.mapping.names' = 'cookieid:_metadata._id, area:area, media_view_tags:media_view_tags, interest:interest');
6、在hive端檢索資料
hive> select * from hive_es;
OK
AVOwmuAVAOB0VDYE1GoM	beijing	diannaokeji	kejiwang
Time taken: 0.126 seconds, Fetched: 1 row(s)
7、以上實現了hive讀取es中的資料,下面從hive端匯入資料。

本步驟的測試資料如下:

1,shanghai,diannaokeji,kejiwang

8、建立hive本地表

CREATE EXTERNAL TABLE hive_es_native (cookieid string,area string,media_view_tags string,interest string ) row format delimited  fields terminated by ',';
9、向hive本地表
hive_es_native中匯入資料,匯入方法是:
hive> load data local inpath '/home/hadoop/xuguokun/test.txt' overwrite into table hive_es_native;
Loading data to table mydatabase.hive_es_native
Table mydatabase.hive_es_native stats: [numFiles=1, numRows=0, totalSize=32, rawDataSize=0]
OK
Time taken: 0.51 seconds
10.通過hive向es中寫入資料,並檢視最終hive_es中的資料
hive> insert overwrite table hive_es select * from hive_es_native;
Query ID = hadoop_20160325192049_caab7883-e8ee-4fc7-9d01-cf34d2ee6473
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1458955914553_0002, Tracking URL = http://Master:8088/proxy/application_1458955914553_0002/
Kill Command = /usr/local/hadoop/bin/hadoop job  -kill job_1458955914553_0002
Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
2016-03-25 19:21:09,038 Stage-0 map = 0%,  reduce = 0%
2016-03-25 19:21:22,042 Stage-0 map = 100%,  reduce = 0%, Cumulative CPU 2.63 sec
MapReduce Total cumulative CPU time: 2 seconds 630 msec
Ended Job = job_1458955914553_0002
MapReduce Jobs Launched: 
Stage-Stage-0: Map: 1   Cumulative CPU: 2.63 sec   HDFS Read: 4160 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 630 msec
OK
Time taken: 36.247 seconds
hive> select * from hive_es;
OK
AVOwucV0AOB0VDYE1GoX	shanghai	diannaokeji	kejiwang
AVOwmuAVAOB0VDYE1GoM	beijing	diannaokeji	kejiwang
Time taken: 0.126 seconds, Fetched: 2 row(s)
11、實驗到此結束