1. 程式人生 > >Spark SQL相容Hive及擴充套件

Spark SQL相容Hive及擴充套件

前言

     相比於Shark對Hive的過渡依賴,Spark SQL在Hive相容層面僅依賴HQL Parser、Hive Metastore和Hive SerDes。也就是說,從HQL被解析成抽象語法樹(AST)起,就全部由Spark SQL接管了,執行計劃生成和優化都由Catalyst負責。本文接下來對於Spark SQL在相容Hive過程中對於Catalog,SqlParser,Analyzer等一系列的具體相容方式進行具體解析。

一、基礎類解析

1.1 Catalog

    Spark中的DataSet和Dataframe API支援結構化分析。結構化分析的一個重要的方面是管理元資料。這些元資料可能是一些臨時元資料(比如臨時表)、SQLContext上註冊的UDF以及持久化的元資料(比如Hivemeta store或者HCatalog)。

 Spark的早期版本是沒有標準的API來訪問這些元資料的。使用者通常使用查詢語句(比如show tables)來查詢這些元資料。這些查詢通常需要操作原始的字串,而且不同元資料型別的操作也是不一樣的。

    這種情況在Spark 2.0中得到改變。Spark 2.0中添加了標準的API(稱為catalog)來訪問Spark SQL中的元資料。這個API既可以操作Spark SQL,也可以操作Hive元資料。

    Catalog類中提供了多種元資料訪問API,Catalog是一個抽象類,它的實現類是CatalogImpl,CatalogImpl提供了面向使用者的內部實現。

    SparkSession中現在直接對CatalogImpl進行維護,如下所示。


1.1.1 CatalogImpl

    CatalogImpl是Catalog抽象類的唯一實現,它維護了一個SessionCatalog的引用,且是直接將SessionState中的SessionCatalog拿過來。



1.2 SessionCatalog

    SessionCatalog是一個Spark Session使用的內部catalog,這個內部catalog作為一個外部metastore的代理(eg. Hive MetaStore)提供服務,同時它還可以管理歸屬於一個SparkSession的臨時表和函式。

    注意,SessionCatalog和Catalog的實現類CatalogImpl中的內容是完全一致的,因為CatalogImpl中維護的sessionCatalog成員變數就是一個真正的SesssionCatalog。

    SessionCatalog本身是一個實現類,可以直接用於構造例項使用,同時它還有一個繼承類HiveSessionCatalog,專門用於管理Hive資料來源的MetaStore

1.2.1 SessionCatalog

    SessionCatalog的構造引數如下:ExternalCatalog,GlobalTempViewManager,FunctionRegistry,SQLConf,Configuration,ParserInterface,FunctionResourceLoader



除此之外,SessionCatalog還維護了以下幾個成員變數:tempTables,currentDb,tableRelationCache


 從SessionCatalog的成員變數可以瞭解,SessionCatalog的外部系統metastore主要是通過ExternalCatalog實現,它對於單SparkSession的臨時表管理主要通過它的一些內容可變的成員變數來維護。關於ExternalCatalog後面會有具體描述。

1.2.2 HiveSessionCatalog

    HiveSessionCatalog繼承了Spark的預設SessionCatalog,並調整了一些成員的型別,以及新增了一些成員變數。

    HiveSessionCatalog中,它的externalCatlog型別轉換為ExternalCatalog的子類HiveExternalCatalog,同時增加了一個成員變數HIveMetastoreCatalog(注意,這個成員之後會完整整合到HiveExternalCatalog中)。


1.2.2.1 HiveMetastoreCatalog

    HiveMetastoreCatalog是HiveSessionCatalog的一個成員變數,是之前的版本中和Hive metastore進行互動遺留下來的產物,之後會逐漸全部融合到HiveExternalCatalog中,目前它還承擔了一部分工作,例如建立資料來源中的表。這裡就不對它做具體描述。

1.3 ExternalCatalog

    ExternalCatalog是系統Catalog的介面,提供訪問function、partition和database的一系列方法。只作用為非臨時的item,它的實現類必須是執行緒安全的,因為它會同時被多個執行緒訪問。ExternalCatalog為Spark提供了和外部系統互動的能力。它的實現類在處理database不存在情況時,需要拋出NoSuchDatabaseException異常。和Catalog一樣,ExternalCatalog同樣提供了許多元資料訪問的方法。

    ExternalCatalog有三個實現類,去除測試類後實際上是兩個具體實現類:HiveExternalCatalog和InMemoryCatalog。

1.3.1 HiveExternalCatalog

    HiveExternalCatalog繼承了ExternalCatalog的所有元資料訪問方法,它為Spark提供了與Hive MetaStore的互動能力。具體實現方法是,HiveExternalCatalog會通過hadoop相關的配置檔案來例項化一個ClientForMetadata,所有的元資料訪問方法都是通過這個client來和hive互動。



1.3.2 InMemoryCatalog

    InMemoryCatalog現在是實驗性的功能,是一個不需要外部系統的偽實現類,在這裡就不過多描述了。



1.4 SharedState

    SharedState是SparkSession中定義的一個基於給定SQLContext來維護跨Session的所有狀態的一個類。它的類成員如下圖所示



1.5 SessionState

    SessionState是基於一個特定SparkSession維護所有單個session作用域的所有狀態,它的類成員如下圖所示。SessionState維護了SparkSQL中大部分的核心類,如SqlParser、Analyzer、Optimizer等等。這些具體類的實現型別根據當前Spark Application的模式會有所不同,具體請看SessionSateBuilder類。



1.6 BaseSessionStateBuilder

    BaseSessionStateBuilder,顧名思義,適用於構造SessionState的類。Spark對它的註釋如下:


 BaseSessionStateBuilder定義所有Session所需的狀態,並且在session的build方法呼叫時會真正去建立一個SessionState。開發人員可以用過提供自定義的元件版本或是使用Analyzer、Optimizer和Planner的hook函式來修改builder類。同時在構建新的SessionState時,BaseSessionStateBuilder可以接收一個parent session state來對其的成員進行整合。

    BaseSessionStateBuilder提供的成員變數和方法如下所示,我們可以與SessionState中的成員進行對比,基本上BaseSessionStateBuilder提供的方法都是在SessionState中存在的成員變數。



BaseSessionStateBuilder的非測試實現類有兩種,HiveSessionStateBuilder和SessionStateBuilder,對於它們的例項化時的判斷請參考2.1 初始化流程。

1.6.1 HiveSessionStateBuilder

    HiveSessionStateBuilder可以構建出一個能識別Hive資料來源的SessionState。HiveSessionStateBuilder類對它的父類BaseSessionStateBuilder中的一些方法和成員變數進行了覆蓋。



1.6.1.1 新增HiveExternalCatalog

    HiveSessionStateBuilder將SharedState中的externalCatalog提取出來,作為自身的一個型別為HiveExternalCatalog的externalCatalog成員變數存在



1.6.1.2 覆蓋resourceLoader成員變數

resourceLoader是一個用於載入使用者定義的函式對應的資源和jar包的成員變數



1.6.1.3 覆蓋catalog成員變數

    我們可以對HiveSessionStateBuilder的catalog和BaseSessionStateBuilder的catalog初始化方法進行比較

HiveSessionStateBuilder的catalog初始化:


BaseSessionStateBuilder的catalog初始化:


1.6.1.4 覆蓋analyzer方法

    analyzer方法的覆蓋,主要是為Analyzer中添加了一些針對Hive的解析規則。Analyzer的主要功能是解析Unresolved LogicalPlan。



1.6.1.5 覆蓋planner方法

    planner方法的覆蓋,主要是為SparkPlanner中新增一些針對Hive的解析規則,例如HiveTableScans等。SparkPlanner的主要功能是將Logical Plan解析成為Physical Plan。



1.6.1.6 覆蓋newBuilder方法

    返回一個新的HiveSessionStateBuilder



1.5.2 SessionStateBuilder

    SessionStateBuilder是BaseSessionStateBuilder的一個具體實現,沒有繫結任何特定外部資料來源,它所作的唯一覆蓋操作就是newBuilder方法,和HiveSessionStateBuilder不同,返回一個SessionStateBuilder。

    可以認為SessionStateBuilder是Spark的一個預設實現,HiveSessionStateBuilder是Spark針對Hive資料來源的特定實現。

二、實踐

2.1 初始化流程

2.1.1 初始化SparkSession

    在每一個Spark Application中,我們首先要新建一個SparkSession來建立對Spark叢集的唯一連線。SparkSession在建立過程中能獲取到所有的配置檔案,包括是否連線Hive,Hive叢集的對應配置。

    注意Spark中的SessionState和ShareState都是lazy變數,只有當第一次使用時才會進行初始化。



2.1.2 初始化SessionState

    SparkSession初始化時一定會呼叫getOrCreate方法,這個方法呼叫過程中會將所有的配置檔案寫入到SessionState的conf中。



呼叫SparkSession中的sessionState方法,該方法中通過SparkSession.instantiateSessionState來對SessionState進行初始化,注意此時引數為SparkSession.sessionStateClassName(sparkContext.conf)


在呼叫sessionStateClassName方法中,Spark會根據當前配置是使用Hive還是原生Spark SQL來選擇性的初始化SessionBuilder的類名到底是HiveSessionStateBuilder還是SessionStateBuilder


根據配置選擇性地獲取SessionStateBuilder的類名後,Spark通過反射初始化對應的BaseSEssionStateBuilder,並呼叫build方法來建立新的SessionState


build方法是HiveSessionStateBuilder和SessionStateBuilder公用的,具體邏輯實現在它們的父類BaseSessionStateBuilder中,差異在build方法中呼叫的lazy val和function,這一部分方法和成員變數的覆蓋在第一章的基礎類解析已經做了具體說明。


2.1.3 初始化ShareState

    在2.1.2的最後,初始化SessionState之前,首先要初始化SharedState。下列方法最主要的初始化方法是初始化externalCatalog,其他方法簡單介紹。

2.1.3.1 初始化warehousePath

    讀取配置資訊,若有hive配置資訊,則從hive配置中獲取warehousePath,若沒有,則在當前執行路徑下自動生成一個warehousePath。

2.1.3.2 初始化CacheManager

    CacheManager可以記住LogicalPlan的cache狀態,它的作用之後詳細介紹。

2.1.3.3 初始化SQLListener

    用於SparkUI監控。

2.1.3.4 初始化externalCatalog

    根據配置檔案建立一個新的ExternalCatalog,並驗證是否有default database,若沒有,則建立一個。這個方法的重點建立操作在SharedState.reflect中。



SharedState.reflect方法如下,根據配置檔案的名稱和實際值去初始化不同的ExternalCatalog,目前只有HiveExternalCatalog



2.1.3.5 初始化globalTempViewManager

    建立全域性臨時檢視管理



2.1.3.6 初始化jarClassLoader

    用於載入使用者定義的jar包



2.2 使用

    在2.1中,SparkSession建立完畢後,SparkSession.catalog其實和SparkSession.sessionState.catalog是同樣的引用。Spark對於catalog使用的地方很多,這裡無法一一列舉,很多地方也沒有學習到。這裡主要可以看一下catalog在Analyzer中的使用。

    Analyzer在BaseSessionSateBuilder中初始化時,會使用BaseSessionStateBuilder先初始化過的SessionCatalog作為構造引數。



在Analyzer實際的匹配過程中,部分函式使用到了catalog來進行匹配,例如,lookupTableFromCatalog等等


三、總結

    Spark SQL相容Hive過程中,可以看出Spark SQL的整體架構中,已經預留出了不同資料來源的擴充套件介面,不同資料來源在適配過程中需要針對ExternalCatalog、BaseSessionStateBuilder、Analyzer和SparkPlanner做獨立的資料來源適配即可。