1. 程式人生 > >10.大資料學習之旅——hive2

10.大資料學習之旅——hive2

Hive解決資料傾斜問題


概述

什麼是資料傾斜以及資料傾斜是怎麼產生的?
簡單來說資料傾斜就是資料的key 的分化嚴重不均,造成一部分資料很多,一部分資料很少的局面。
舉個 word count 的入門例子,它的map 階段就是形成 (“aaa”,1)的形式,然後在reduce 階段進
行 value 相加,得出 “aaa” 出現的次數。若進行 word count 的文字有100G,其中 80G 全部是 “aaa” 剩
下 20G 是其餘單詞,那就會形成 80G 的資料量交給一個 reduce 進行相加,其餘 20G 根據 key 不同分散到不
同 reduce 進行相加的情況。如此就造成了資料傾斜,臨床反應就是 reduce 跑到 99%然後一直在原地等著 那
80G 的reduce 跑完。

如此一來 80G 的 aaa 將發往同一個 reducer ,由此就可以知道 reduce 最後 1% 的工作在等什麼了。

為什麼說資料傾斜與業務邏輯和資料量有關?

從另外角度看資料傾斜,其本質還是在單臺節點在執行那一部分資料reduce任務的時候,由於資料量大,跑不
動,造成任務卡住。若是這臺節點機器記憶體夠大,CPU、網路等資源充足,跑 80G 左右的資料量和跑10M 數
據量所耗時間不是很大差距,那麼也就不存在問題,傾斜就傾斜吧,反正機器跑的動。所以機器配置和資料量
存在一個合理的比例,一旦資料量遠超機器的極限,那麼不管每個key的資料如何分佈,總會有一個key的資料
量超出機器的能力,造成 reduce 緩慢甚至卡頓。

業務邏輯造成的資料傾斜會多很多,日常使用過程中,容易造成資料傾斜的原因可以歸納為幾點:
1)group by
2)distinct count(distinct xx)
3)join

如何處理group by的資料傾斜問題

1、調優引數
set hive.groupby.skewindata=true;
hive.groupby.skewindata=true:資料傾斜時負載均衡,當選項設定為true,生成的查詢計劃會有兩個
MRJob。第一個MRJob 中,Map的輸出結果集合會隨機分佈到Reduce中,每個Reduce做部分聚合操作,並
輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,從而達到負載均衡的目
的;第二個MRJob再根據預處理的資料結果按照GroupBy Key分佈到Reduce中(這個過程可以保證相同的
GroupBy Key被分佈到同一個Reduce中),最後完成最終的聚合操作。
由上面可以看出起到至關重要的作用的其實是第二個引數的設定,它使計算變成了兩個mapreduce,先在第一
箇中在 shuffle 過程 partition 時隨機給 key 打標記,使每個key 隨機均勻分佈到各個 reduce 上計算,但是這
樣只能完成部分計算,因為相同key沒有分配到相同reduce上,所以需要第二次的mapreduce,這次就回歸正
常 shuffle,但是資料分佈不均勻的問題在第一次mapreduce已經有了很大的改善,因此基本解決資料傾斜。

Hive優化

1)map side join
mapJoin的主要意思就是,當連結的兩個表是一個比較小的表和一個特別大的表的時
候,我們把比較小的table直接放到記憶體中去,然後再對比較大的表格進行map操
作。join就發生在map操作的時候,每當掃描一個大的table中的資料,就要去去檢視
小表的資料,哪條與之相符,繼而進行連線。這裡的join並不會涉及reduce操
作。map端join的優勢就是在於沒有shuffle,在實際的應用中,我們這樣設定:
set hive.auto.convert.join=true;
此外,hive有一個引數:hive.mapjoin.smalltable.filesize,預設值是25mb(其中一
個表大小小於25mb時,自動啟用mapjoin)
要求:在hive做join時,要求小表在前(左)
2)join語句優化
優化前
select m.cid,u.id form order m join customer u on m.cid=u.id where
m.dt=’20160801’;
優化後
select m.cid,u.id from (select cid from order where dt=’20160801’)m
join customer u on m.cid = u.id
注意:Hive在做join時,小表寫在前(左邊)。
3)group by 優化
hive.groupby.skewindata=true
如果group by過程出現傾斜,應該設定為true
4)count distinct 優化
優化前

select count(distinct id )from tablename

注意:count操作是全域性計數,在底層轉換成MRjob時,用於計數的分割槽(reduceTask)
只能有一個。

優化後

select count(*) from (select distinct id from tablename)tmp;

此外,再設定一下reduce的任務數量。
注意:count這種全域性計數的操作,Hive只會用一個Reduce來實現

日常統計場景中,我們經常會對一段時期內的欄位進行消重並統計數量,SQL語句類似

SELECT COUNT( DISTINCT id ) FROM TABLE_NAME WHERE …;
這條語句是從一個表的符合WHERE條件的記錄中統計不重複的id的總數。
該語句轉化為MapReduce作業後執行示意圖如下,圖中還列出了我們實驗作業中
Reduce階段的資料規模:
在這裡插入圖片描述

由於引入了DISTINCT,因此在Map階段無法利用combine對輸出結果消重,必須將id
作為Key輸出,在Reduce階段再對來自於不同Map Task、相同Key的結果進行消重,
計入最終統計值。
我們看到作業執行時的Reduce Task個數為1,對於統計大資料量時,這會導致最終
Map的全部輸出由單個的ReduceTask處理。這唯一的Reduce Task需要Shuffle大量的
資料,並且進行排序聚合等處理,這使得它成為整個作業的IO和運算瓶頸。
經過上述分析後,我們嘗試顯式地增大Reduce Task個數來提高Reduce階段的併發,
使每一個Reduce Task的資料處理量控制在2G左右。具體設定如下:
set mapred.reduce.tasks=100
調整後我們發現這一引數並沒有影響實際Reduce Task個數,Hive執行時輸出
“Number of reduce tasks determined at compile time: 1”。
原因是Hive在處理COUNT這種“全聚合(full aggregates)”計算時,它會忽略使用者指
定的Reduce Task數,而強制使用1。

所以我們只能採用變通的方法來繞過這一限制。我們利用Hive對巢狀語句的支援,將原
來一個MapReduce作業轉換為兩個作業,在第一階段選出全部的非重複id,在第二階
段再對這些已消重的id進行計數。這樣在第一階段我們可以通過增大Reduce的併發
數,併發處理Map輸出。在第二階段,由於id已經消重,因此COUNT(*)操作在Map階
段不需要輸出原id資料,只輸出一個合併後的計數即可。這樣即使第二階段Hive強制指
定一個Reduce Task,極少量的Map輸出資料也不會使單一的Reduce Task成為瓶頸。
改進後的SQL語句如下:

SELECT COUNT(*) FROM (SELECT DISTINCT id FROM TABLE_NAME WHERE)
t;

這一優化使得在同樣的執行環境下,優化後的語句執行只需要原語句20%左右的時間。
優化後的MapReduce作業流如下:
在這裡插入圖片描述

5)調整切片數(map任務數)
Hive底層自動對小檔案做了優化,用了CombineTextInputFormat,將做個小檔案切片合
成一個切片。
合成完之後的切片大小,如果>mapred.max.split.size 的大小,就會生成一個新的切
片。
mapred.max.split.size 預設是128MB
set mapred.max.split.size=134217728(128MB)
對於切片數(MapTask)數量的調整,要根據實際業務來定,比如一個100MB的檔案
假設有1千萬條資料,此時可以調成10個MapTask,則每個MapTask處理1百萬條數
據。
6)JVM重利用
set mapred.job.reuse.jvm.num.tasks=20(預設是1個)
JVM重用是hadoop調優引數的內容,對hive的效能具有非常大的影響,特別是對於很
難避免小檔案的場景或者task特別多的場景,這類場景大多數執行時間都很短。這時
JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成千上萬個task任
務的情況。
JVM重用可以使得一個JVM程序在同一個JOB中重新使用N次後才會銷燬。
7)啟用嚴格模式
在hive裡面可以通過嚴格模式防止使用者執行那些可能產生意想不到的不好的效果的查詢,
從而保護hive的叢集。
使用者可以通過 set hive.mapred.mode=strict 來設定嚴格模式,改成unstrict則為非嚴
格模式。
在嚴格模式下,使用者在執行如下query的時候會報錯:
①分割槽表的查詢沒有使用分割槽欄位來限制
②使用了order by 但沒有使用limit語句。(如果不使用limit,會對查詢結果進行全域性
排序,消耗時間長)
③產生了笛卡爾積
當用戶寫程式碼將表的別名寫錯的時候會引起笛卡爾積,例如

SELECT *
FROM origindb.promotion__campaign c
JOIN origindb.promotion__campaignex ce
ON c.id = c.id
limit 1000

在這裡插入圖片描述
8)關閉推測執行機制
因為在測試環境下我們都把應用程式跑通了,如果還加上推測執行,如果有一個數據分
片本來就會發生資料傾斜,執行執行時間就是比其他的時間長,那麼hive就會把這個執
行時間長的job當作執行失敗,繼而又產生一個相同的job去執行,後果可想而知。可通
過如下設定關閉推測執行:
set mapreduce.map.speculative=false
set mapreduce.reduce.speculative=false
set hive.mapred.reduce.tasks.speculative.execution=false

Hive的分桶表


如何使用分桶表

1.建立帶桶的 table :

create table teacher(name string) clustered by (name) into 3 buckets row format delimited
fields terminated by ' ';

2.開啟分桶機制:
set hive.enforce.bucketing=true;
3.往表中插入資料:

insert overwrite table teacher select * from tmp;//需要提前準備好temp,從temp查詢資料寫入到
teacher

注:teacher是一個分桶表,對於分桶表,不允許以外部檔案方式匯入資料,只能從另外一張表資料導
入。分通表只能是內部表。
temp檔案資料樣例:

java zhang
web wang
java zhao
java qin
web liu
web zheng
ios li
linux chen
ios yang
ios duan
linux ma
linux xu
java wen
web wu

作用及原理

分桶的原理是根據指定的列的計算hash值模餘分桶數量後將資料分開存放。方便資料抽樣
select * from teacher tablesample(bucket 1 out of 3 on name);
注:分桶語法—TABLESAMPLE(BUCKET x OUT OF y)
y必須是table總bucket數的倍數或者因子。hive根據y的大小,決定抽樣的比例。
例如:table總共分了3份,當y=3時,抽取(3/3=)1個bucket的資料,當y=6時,抽取(3/6=)1/2個
bucket的資料。
x表示從哪個bucket開始抽取。
例如:table總bucket數為3,tablesample(bucket 3 out of 3),表示總共抽取(3/3=)1個bucket的
資料,抽取第3個bucket的資料。
再例如:table總bucket數為32,tablesample(bucket 3 out of 16),表示總共抽取(32/16=)2個
bucket的資料,分別為第3個bucket和第(3+16=)19個bucket的資料。
查詢第一個桶裡資料,並返回一半的資料:

select * from bucketed_user tablesample(bucket 1 out of 6 on id);

Hive的mysql安裝配置


實現步驟:

  1. 刪除hdfs中的/user/hive
    執行:
hadoop fs -rmr /user/hive
  1. 將mysql驅動包上傳到hive安裝目錄的lib目錄下
  2. 編輯新的配置檔案,名字為:hive-site.xml
  3. 配置相關資訊:
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop01:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
</configuration>
  1. 進入hive ,進入bin目錄,執行:sh hive
    如果出現:
    Access denied for user ‘root’@‘hadoop01’ (using password: YES)這個錯誤,指的是當前用
    戶操作mysql資料庫的許可權不夠。
  2. 進入到mysql資料庫,進行許可權分配
    執行:
    grant all privileges on . to ‘root’@‘hadoop01’ identified by ‘root’ with grant option;

    grant all on . to ‘root’@’%’ identified by ‘root’;
    然後執行:
    flush privileges;
  3. 如果不事先在mysql裡建立hive資料庫,在進入hive時,mysql會自動建立hive資料庫。但是注意,
    因為我們之前配置過mysql的字符集為utf-8,所以這個自動建立的hive資料庫的字符集是utf-8的。
    但是hive要求儲存元資料的字符集必須是iso8859-1。如果不是的話,hive會在建立表的時候報錯
    (先是卡一會,然後報錯)。
    解決辦法:在mysql資料裡,手動建立hive資料庫,並指定字符集為iso8859-1;
    進入mysql資料庫,
    然後執行:create database hive character set latin1;
  4. 以上步驟都做完後,再次進入mysql的hive資料,發現有如下的表:
    在這裡插入圖片描述
  5. 可以通過navicat來連線資料庫。在這裡插入圖片描述
  6. 可以通過DBS 、TBLS、COLUMNS_V2這三張表來檢視元資料資訊。
    DBS 存放的資料庫的元資料資訊
    在這裡插入圖片描述

TBLS存放的tables表資訊
在這裡插入圖片描述

COLUMNS表存放的是列欄位資訊
在這裡插入圖片描述

此外,可以通過檢視SDS表來查詢HDFS裡的位置資訊
在這裡插入圖片描述

Hive JDBC


Hive的jdbc程式設計

hive實現了jdbc介面,所以可以通過java程式碼操作。但是實際應用中用的不多,一般都是在HDFS儲
存的檔案基礎上建立外部表來進行查詢處理。所以jdbc瞭解一下即可。

實現步驟:

  1. 在伺服器端開啟HiveServer服務
./hive --service hiveserver2 & (以後臺執行緒啟動)
  1. 建立本地工程,匯入jar包
    匯入hive\lib目錄下的hive-jdbc-1.2.0-standalone.jar
    匯入hadoop-2.7.1\share\hadoop\common下的hadoop-common-2.7.1.jar
  2. 編寫jdbc程式碼執行
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

import org.junit.Test;

public class TestDemo {
	@Test
	public void testConnectAndQuery() throws Exception {
		//註冊資料庫驅動,用的hive的jdbc,驅動名固定寫死
		Class.forName("org.apache.hive.jdbc.HiveDriver");
		//如果用的是hive2服務,則寫jdbc:hive2,後面跟上hive伺服器的ip以及埠號,埠號預設是10000
		Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.60.132:10000/park01","root","root");
		Statement stat = conn.createStatement();
		ResultSet rs = stat.executeQuery("select * from tb1");
		while(rs.next()){
			String name = rs.getString("name");
			System.out.println(name);
		}
		
		stat.close();
		conn.close();
	}
	
	@Test
	public void testInsert() throws Exception{
		Class.forName("org.apache.hive.jdbc.HiveDriver");
		Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.60.132:10000/park01","root","root");
		Statement stat = conn.createStatement();
		//executeUpdate可用於:建立表,向表中插入資料以及刪除表
		//stat.executeUpdate("insert into tb1 values(2,'tom')");
		//stat.executeUpdate("create table stu2(id int,name string) row format delimited fields terminated by ' '");
		stat.executeUpdate("drop table stu2");
		
		
		stat.close();
		stat.close();
	}


}

Hive體系結構


在這裡插入圖片描述

使用者介面主要有三個:CLI,JDBC 和 WUI
1.CLI,最常用的模式。實際上在>hive 命令列下操作時,就是利用CLI使用者介面。
2.JDBC,通過java程式碼操作,需要啟動hiveserver,然後連線操作。

Metastore

Hive將元資料儲存在資料庫中,如mysql、derby。Hive中的元資料包括表的名字,
表的列和分割槽及其屬性,表的屬性(是否為外部表等),表的資料所在目錄等。
直譯器(complier)、優化器(optimizer)、執行器(executor)元件
這三個元件用於:HQL語句從詞法分析、語法分析、編譯、優化以及查詢計劃的生成。生成的查詢計劃儲存在
HDFS中,並在隨後有MapReduce呼叫執行。
Hadoop
Hive的資料儲存在HDFS中,大部分的查詢、計算由MapReduce完成

Hive工作流程


在這裡插入圖片描述

  1. 通過客戶端提交一條Hql語句
  2. 通過complier(編譯元件)對Hql進行詞法分析、語法分析。在這一步,編譯器要知道此hql
    語句到底要操作哪張表
  3. 去元資料庫找表資訊
  4. 得到資訊
  5. complier編譯器提交Hql語句分析方案。
  6. 1 executor 執行器收到方案後,執行方案(DDL過程)。在這裡注意,執行器在執行方案
    時,會判斷
    如果當前方案不涉及到MR元件,比如為表新增分割槽資訊、比如字串操作等,比如簡單的查詢
    操作等,此時就會直接和元資料庫互動,然後去HDFS上去找具體資料。
    如果方案需要轉換成MR job,則會將job 提交給Hadoop的JobTracker。
  7. 2 MR job完成,並且將執行結果寫入到HDFS上。
  8. 3 執行器和HDFS互動,獲取結果檔案資訊。
  9. 如果客戶端提交Hql語句是帶有查詢結果性的,則會發生:7-8-9步,完成結果的查詢。

Hive特點

針對海量資料的高效能查詢和分析系統

由於 Hive 的查詢是通過 MapReduce 框架實現的,而 MapReduce 本身就是為實現針對海量數
據的高效能處理而設計的。所以 Hive 天然就能高效的處理海量資料。
與此同時,Hive 針對 HiveQL 到 MapReduce的翻譯進行了大量的優化,從而保證了生成的
MapReduce 任務是高效的。在實際應用中,Hive 可以高效的對 TB 甚至 PB級的資料進行處
理。

類SQL的查詢語言

HiveQL 和 SQL 非常類似,所以一個熟悉SQL 的使用者基本不需要培訓就可以非常容易的使用
Hive 進行很複雜的查詢。

HiveQL 靈活的可擴充套件性(Extendibility)

除了 HiveQL 自身提供的能力,使用者還可以自定義其使用的資料型別、也可以用任何語言自定
義 mapper 和 reducer 指令碼,還可以自定義函式(普通函式、聚集函式)等。這就賦予了
HiveQL 極大的可擴充套件性。使用者可以利用這種可擴充套件性實現非常複雜的查詢。

高擴充套件性(Scalability)和容錯性

Hive本身並沒有執行機制,使用者查詢的執行是通過 MapReduce 框架實現的。由於MapReduce
框架本身具有高度可擴充套件(計算能力隨 Hadoop 機群中機器的數量增加而線性增加)和高容錯的
特點,所以 Hive也相應具有這些特點。

與 Hadoop 其他產品完全相容

Hive 自身並不儲存使用者資料,而是通過介面訪問使用者資料。這就使得 Hive支援各種資料來源和
資料格式。例如,它支援處理 HDFS 上的多種檔案格式(TextFile、SequenceFile 等),還支
持處理 HBase 資料庫。使用者也完全可以實現自己的驅動來增加新的資料來源和資料格式。一種
理想的應用模型是將資料儲存在 HBase 中實現實時訪問,而用Hive對HBase 中的資料進行批
量分析。

Sqoop安裝及指令


Sqoop介紹

sqoop是Apache 提供的工具
用於hdfs和關係型資料庫之間資料的匯入和匯入
可以從hdfs匯出資料到關係型資料庫,也可以從關係型資料庫匯入資料到hdfs。
實現步驟:

  1. 準備sqoop安裝包,官網地址:http://sqoop.apache.org
  2. 配置jdk環境變數和Hadoop的環境變數。因為sqoop在使用是會去找環境變數對應的路徑,從而完整工作。
  3. sqoop解壓即可使用(前提是環境變數都配好了)
  4. 需要將要連線的資料庫的驅動包加入sqoop的lib目錄下(本例中用的是mysql資料庫)
  5. 利用指令操作sqoop

Sqoop基礎指令(在Sqoop的bin目錄下執行下列指令)
在這裡插入圖片描述
在這裡插入圖片描述

上一篇 9.大資料學習之旅——hive