1. 程式人生 > >hadoop生態系統學習之路(六)hive的簡單使用

hadoop生態系統學習之路(六)hive的簡單使用

一、hive的基本概念與原理

Hive是基於Hadoop之上的資料倉庫,可以儲存、查詢和分析儲存在 Hadoop 中的大規模資料。Hive 定義了簡單的類 SQL 查詢語言,稱為 HQL,它允許熟悉 SQL 的使用者查詢資料,允許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 無法完成的複雜的分析工作。Hive 沒有專門的資料格式。
hive的訪問方式:
這裡寫圖片描述
hive的執行原理:
這裡寫圖片描述

二、hive的常用命令

連線進入hive:hive
刪除資料庫:drop database if exists qyk_test cascade;如下圖:
這裡寫圖片描述


然後,我們使用create database qyk_test;建立一個qyk_test的資料庫,如下:
這裡寫圖片描述
接下來,我們執行create table user_info(id bigint, account string, name string, age int) row format delimited fields terminated by ‘\t’;建立一張表,如下:
這裡寫圖片描述
我們可以執行describe user_info;查看錶結構,如下:
這裡寫圖片描述
然後,我們使用create table user_info_tmp like user_info;建立一個和user_info一樣結構的臨時表,如下:
這裡寫圖片描述

然後我們準備一個檔案user_info.txt,以製表符分隔,如下

11  1200.0  qyk1    21
22  1301    qyk2    22
33  1400.0  qyk3    23
44  1500.0  qyk4    24
55  1210.0  qyk5    25
66  124 qyk6    26
77  1233    qyk7    27
88  15011   qyk8    28

接下來執行load data local inpath ‘/tmp/user_info.txt’ into table user_info;可看到如下:
這裡寫圖片描述
然後執行select * from user_info;可看到:
這裡寫圖片描述


然後,我們執行insert into table user_info_tmp select id, account, name, age from user_info;可以看到:
這裡寫圖片描述
這裡,hive將此語句的執行轉為MR,最後將資料入到user_info_tmp。
然後,我們執行select count(*) from user_info_tmp;可看到:
這裡寫圖片描述
同樣的是將sql轉為mr執行。
最後,執行insert overwrite table user_info select * from user_info where 1=0;清空表資料。
執行drop table user_info_tmp;便可刪除表,如下:
這裡寫圖片描述
好了,基本命令就講到這兒,關於外部表、分割槽、桶以及儲存格式相關的概念大家也可以去研究下。

三、編寫MR將資料直接入到hive

此MR只有Mapper,沒有reducer。直接在mapper輸出到hive表。
pom需新增依賴:

<!-- hcatalog相關jar -->
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-core</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-hbase-storage-handler</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-server-extensions</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-pig-adapter</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-webhcat-java-client</artifactId>
          <version>${hive.version}</version>
      </dependency>

Mapper類:

/**
 * Project Name:mr-demo
 * File Name:HiveStoreMapper.java
 * Package Name:org.qiyongkang.mr.hivestore
 * Date:2016年4月4日下午10:02:07
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.hivestore;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;

/**
 * ClassName:HiveStoreMapper <br/>
 * Function: Mapper類. <br/>
 * Date:     2016年4月4日 下午10:02:07 <br/>
 * @author   qiyongkang
 * @version  
 * @since    JDK 1.6
 * @see      
 */
public class HiveStoreMapper extends Mapper<Object, Text, WritableComparable<Object>, HCatRecord> {
    private HCatSchema schema = null;

    //每個mapper例項,執行一次
    @Override
    protected void setup(Mapper<Object, Text, WritableComparable<Object>, HCatRecord>.Context context)
            throws IOException, InterruptedException {
        schema = HCatOutputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    protected void map(Object key, Text value, Mapper<Object, Text, WritableComparable<Object>, HCatRecord>.Context context)
            throws IOException, InterruptedException {
        //每行以製表符分隔   id, account, name, age
        String[] strs = value.toString().split("\t");

        HCatRecord record = new DefaultHCatRecord(4);
        //id,通過列下表
        record.set(0, Long.valueOf(strs[0]));

        //account
        record.set(1, strs[1]);

        //name
        record.set(2, strs[2]);

        //age,通過欄位名稱
        record.set("age", schema, Integer.valueOf(strs[3]));

        //寫入到hive
        context.write(null, record);
    }


    public static void main(String[] args) {
        String value = "1   1200    qyk 24";
        String[] strs = value.toString().split("\t");
        for (int i = 0; i < strs.length; i++) {
            System.out.println(strs[i]);
        }
    }
}

主類:

/**
 * Project Name:mr-demo
 * File Name:LoadDataToHiveMR.java
 * Package Name:org.qiyongkang.mr.hivestore
 * Date:2016年4月4日下午9:55:42
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.hivestore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;

/**
 * ClassName:LoadDataToHiveMR <br/>
 * Function: MR將資料直接入到hive. <br/>
 * Date: 2016年4月4日 下午9:55:42 <br/>
 * 
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class LoadDataToHiveMR {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            Job job = Job.getInstance(conf, "hive store");
            job.setJarByClass(LoadDataToHiveMR.class);

            // 設定Mapper
            job.setMapperClass(HiveStoreMapper.class);

            // 由於沒有reducer,這裡設定為0
            job.setNumReduceTasks(0);

            // 設定輸入檔案路徑
            FileInputFormat.addInputPath(job, new Path("/qiyongkang/input"));

            // 指定Mapper的輸出
            job.setMapOutputKeyClass(WritableComparable.class); // map
            job.setMapOutputValueClass(DefaultHCatRecord.class);// map

            //設定要入到hive的資料庫和表
            HCatOutputFormat.setOutput(job, OutputJobInfo.create("qyk_test", "user_info", null));
            //這裡注意是使用job.getConfiguration(),不能直接使用conf
            HCatSchema hCatSchema = HCatOutputFormat.getTableSchema(job.getConfiguration());
            HCatOutputFormat.setSchema(job, hCatSchema);

            //設定輸出格式類
            job.setOutputFormatClass(HCatOutputFormat.class);

            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

然後,我們使用maven打個包,上傳到伺服器。
然後,我們準備一個user_info.txt,上傳至hdfs中的/qiyongkang/input下:

11  1200.0  qyk1    21
22  1301    qyk2    22
33  1400.0  qyk3    23
44  1500.0  qyk4    24
55  1210.0  qyk5    25
66  124 qyk6    26
77  1233    qyk7    27
88  15011   qyk8    28

注意以製表符\t分隔。
然後執行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,在jobhistory可以看到:
這裡寫圖片描述
其實,hive的元資料是放在hdfs上,執行hadoop fs -ls /user/hive/warehouse可以看到:
這裡寫圖片描述
然後,我們在hive命令列執行 select * from user_info;可以看到:
這裡寫圖片描述
說明資料從hdfs寫入到hive成功。

四、使用java jdbc連線Thrift Server查詢元資料

接下來,我們使用java編寫一個客戶端,來查詢剛才入到hive裡面的資料,程式碼如下:

package org.hive.demo;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

public class HiveStoreClient {
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://172.31.25.8:10000/qyk_test";
    private static String user = "hive";
    private static String password = "";
    private static final Logger log = Logger.getLogger(HiveStoreClient.class);

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        ResultSet res = null;
        try {
            //載入驅動
            Class.forName(driverName);
            //獲取連線
            conn = DriverManager.getConnection(url, user, password);
            stmt = conn.createStatement();

            // select * query
            String sql = "select * from user_info";
            System.out.println("Running: " + sql);

            //執行查詢
            res = stmt.executeQuery(sql);

            //處理結果集
            List list = convertList(res);
            System.out.println("總記錄:" + list);

            //獲取總個數
            sql = "select count(1) from user_info";
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println("總個數:" + res.getString(1));
            }

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            log.error(driverName + " not found!", e);
            System.exit(1);
        } catch (SQLException e) {
            e.printStackTrace();
            log.error("Connection error!", e);
            System.exit(1);
        } finally {
            try {
                res.close();
                stmt.close();
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 
     * convertList:將結果集轉換成map. <br/>
     *
     * @author qiyongkang
     * @param rs
     * @return
     * @throws SQLException
     * @since JDK 1.6
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static List convertList(ResultSet rs) throws SQLException {  
        List list = new ArrayList();  
        ResultSetMetaData md = rs.getMetaData();  
        int columnCount = md.getColumnCount(); //Map rowData; 


        while (rs.next()) { //rowData = new HashMap(columnCount);    
            Map rowData = new HashMap();  
            for (int i = 1; i <= columnCount; i++) {  
                rowData.put(md.getColumnName(i), rs.getObject(i));  
            }  
            list.add(rowData);  
        } 
        return list;  
    }
}

執行後,可以看到控制檯輸出如下:
這裡寫圖片描述
開始的異常可以忽略。可以看到資料,說明是成功的。

好了,hive就講到這兒了。其實,hive還可以同步hbase的資料,還可以將hive的表資料同步到impala,因為它們都是使用相同的元資料,這個在後面的博文中再進行介紹。