1. 程式人生 > >Impala原始碼分析(2)-SQL解析與執行計劃生成

Impala原始碼分析(2)-SQL解析與執行計劃生成

Impala的SQL解析與執行計劃生成部分是由impala-frontend(Java)實現的,監聽埠是21000。使用者通過Beeswax介面BeeswaxService.query()提交一個請求,在impalad端的處理邏輯是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)這個函式(在impala-beeswax-server.cc中實現)完成的。

在impala中一條SQL語句先後經歷BeeswaxService.Query->TClientRequest->TExecRequest,最後把TExecRequest交由impala-coordinator分發給多個backend處理。本文主要講一條SQL語句是怎麼一步一步變成TExecRequest的。

本文以下內容都以這樣的一個SQL為例說明:

select jobinfo.dt,user,
max(taskinfo.finish_time-taskinfo.start_time),
max(jobinfo.finish_time-jobinfo.submit_time)
from taskinfo join jobinfo on jobinfo.jobid=taskinfo.jobid
where jobinfo.job_status='SUCCESS' and taskinfo.task_status='SUCCESS'
group by jobinfo.dt,user

通過呼叫Status ImpalaServer::GetExecRequest(const TClientRequest& request, TExecRequest* result) 函式把TClientRequest轉化成TExecRequest

在這個函式裡通過JNI介面呼叫frontend.createExecRequest()生成TExecRequest。首先呼叫AnalysisContext.analyze(String stmt)分析提交的SQL語句。

註釋:Analyzer物件是個存放這個SQL所涉及到的所有資訊(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知識庫,所有跟這個SQL有關的東西都會存到Analyzer物件裡面。

1,SQL的詞法分析,語法分析

AnalysisContext.analyze(String stmt)會呼叫SelectStmt.analyze()函式,這個函式就是對SQL的analyze和向中央知識庫Analyzer register各種資訊。

(1)處理這個SQL所涉及到的Table(即TableRefs),這些Table是在from從句中提取出來的(包含關鍵字from, join, on/using)。注意JOIN操作以及on/using條件是儲存在參與JOIN操作的右邊的表的TableRef中並分析的。依次analyze()每個TableRef,向Analyzer註冊registerBaseTableRef(填充TupleDescriptor)。如果對應的TableRef涉及到JOIN操作,還要analyzeJoin()。在analyzeJoin()時會向Analyzer registerConjunct()填充Analyzer的一些成員變數:conjuncts,tuplePredicates(TupleId與conjunct的對映),slotPredicates(SlotId與conjunct的對映),eqJoinConjuncts。本例中on從句是一種BinaryPredicate,然後onClause.analyze(analyzer)會遞迴analyze這個on從句裡的各種元件。

(2)處理select從句(包含關鍵字select, MAX(), AVG()等聚集函式):分析這個SQL都select了哪幾項,每一項都是個Expr型別的子類物件,把這幾項填入resultExprs陣列和colLabels。然後把resultExprs裡面的Expr都遞迴analyze一下,要分析到樹的最底層,向Analyzer註冊SlotRef等。

(3)分析where從句(關鍵字where),首先遞迴Analyze從句中Expr組成的樹,然後向Analyzer registerConjunct()填充Analyzer的一些成員變數(同1,此外還要填充whereClauseConjuncts) 。

(4)處理sort相關資訊(關鍵字order by)。先是解析aliases和ordinals,然後從order by後面的從句中提取Expr填入orderingExprs,接著遞迴Analyze從句中Expr組成的樹,最後建立SortInfo物件。

(5)處理aggregation相關資訊(關鍵字group by, having, avg, max等)。首先遞迴分析group by從句裡的Expr,然後如果有having從句就像where從句一樣,先是analyze having從句中Expr組成的樹,然後向Analyzer registerConjunct()等。

(6)處理InlineView。

關於SQL解析中所涉及到的各種資料結構表示如下:

至此詞法分析,語法分析結束,有點像一個小的編譯器。我們現在回到frontend.createExecRequest()函式中。呼叫完AnalysisContext.analyze()之後,就開始填充TExecRequest內的成員變數。

(1)如果是DDL命令(use, show tables, show databases, describe),那麼呼叫createDdlExecRequest();

(2)另外一種情況就是Query或者DML命令,那麼就得建立和填充TQueryExecRequest了。

2,根據SQL語法樹生成執行計劃(PlanNode和PlanFragment的生成)

下面就是用Planner把SQL解析出的語法樹轉換成Plan fragments,後者能在各個backend被執行。

Planner planner = new Planner();

ArrayListfragments =

planner.createPlanFragments(analysisResult, request.queryOptions);

這個createPlanFragments()函式是frontend最重要的函式:根據SQL解析的結果和client傳入的query options,生成執行計劃。執行計劃是用PlanFragment的陣列表示的,最後會序列化到TQueryExecRequest.fragments然後傳給backend的coordinator去排程執行。

下面進入Planner.createPlanFragments()函式看看執行計劃是怎麼生成的:

首先要搞清楚兩個概念:PlanNode和PlanFragment。

PlanNode是SQL解析出來的邏輯功能節點;PlanFragment是真正的執行計劃節點。

2.1,建立PlanNode

PlanNode singleNodePlan =

createQueryPlan(queryStmt, analyzer, queryOptions.getDefault_order_by_limit());

(1)這個函式首先根據from從句中的第一個TableRef建立一個PlanNode,一般為ScanNode(HdfsScanNode或者HBaseScanNode)。這個ScanNode關聯一個ValueRange的陣列(由多個cluster column取值區間組成)表示要讀取的Table的範圍,還關聯一個conjunct(where從句)。

(2)這個SQL語句中TableRef中剩下的其他Table就需要建立HashJoinNode了。進入Planner.createHashJoinNode()函式:首先為這個Table建立ScanNode(同上),然後呼叫getHashLookupJoinConjuncts()獲取兩表或者多表JOIN的eqJoinConjuncts和eqJoinPredicates,利用這兩個條件建立HashJoinNode。每個HashJoinNode也是樹狀的,會有孩子節點,對於我們舉例的兩表JOIN,孩子節點分別是兩個表對應的ScanNode。(注意目前impala只支援一大一小兩個表的JOIN,預設是左大右小,是通過把右邊的小表分發到每個節點的記憶體中分別於左邊大表的一個區間進行JOIN過濾實現的。)

(3)如果有group by從句,建立AggregationNode,並把剛才的HashJoinNode設為它的孩子。這裡暫時不考慮DISTINCT aggregation function。

(4)如果有order by… limit從句,建立SortNode。

這樣createQueryPlan()函式執行完畢,PlanNode組成的execution tree形成如下:

2.2,建立PlanFragment

接下來就看impala backend節點數目有多少,如果只有一個節點,那麼整棵執行樹都在同一個impalad上執行;否則呼叫createPlanFragments(singleNodePlan, isPartitioned, false, fragments)把PlanNode組成的執行樹轉換成PlanFragment組成的執行計劃。

下面進入createPlanFragments()這個函式:

這是一個遞迴函式,沿著PlanNode組成的執行樹遞迴下去,分別建立對應的Fragment。

(1)如果是ScanNode,建立一個PlanFragment(這個PlanFragment的root node是這個ScanNode,而且這個PlanFragment只包含一個PlanNode)。

(2)如果是HashJoinNode,並不是建立一個新的PlanFragment,而是修改leftChildFragment(是一個ScanNode)為以HashJoinNode作為root node的PlanFragment。因為對於HashJoinNode一般有兩個ScanNode孩子,在處理HashJoinNode之前已經把這兩個ScanNode變成了對應的PlanFragment。那麼此時要得到HashJoinNode作為root node的PlanFragment是通過Planner.createHashJoinFragment()函式完成的:首先把當前HashJoinNode作為HashJoinFragment的root node;然後把leftChildFragment中的root PlanNode(也就是參與JOIN的兩個表中左邊的那個表對應的ScanNode)作為HashJoinNode的左孩子;通過呼叫Planner.connectChildFragment()函式把HashJoinNode的右孩子設定為一個ExchangeNode(這個ExchangeNode表示一個1:n的資料流的receiver);同時把rightChildFragment(ScanNode作為root node)的destination設定為這個ExchangeNode。

(3)如果是AggregationNode,聚集操作很複雜了。以我們的例子來說明:如果這個AggregationNode不是DISTINCT aggregation的2nd phase(因為本例中的AggregationNode的孩子是HashJoinNode而不是另外一個AggregationNode),首先把剛才生成的HashJoinNode作為root node對應的PlanFragment的root node設定為該AggregationNode,並把原來的root node(即HashJoinNode)設為新root node的孩子。然後通過Planner.createParentFragment()建立一個包含ExchangeNode作為root node的新的PlanFragment。並把孩子PlanFragment的destination設定為這個ExchangeNode。然後在這個新的PlanFragment中建立一個新的AggregationNode作為新的root node並把剛才的ExchangeNode作為其孩子節點。

至此,createPlanFragments()呼叫完成,生成的三個PlanFragment如下:

通過createPlanFragments(singleNodePlan, isPartitioned, false, fragments)獲取了所以執行計劃PlanFragment組成的陣列fragments,這個陣列的最後一個元素就是根節點PlanFragment。然後就是呼叫PlanFragment.finalize()把這個執行計劃finalize(遞迴finalize每個PlanNode)同時為每個PlanFragment指定 DataStreamSink。

然後回到frontend.createExecRequest()函式中。執行完Planner.createPlanFragments()返回的ArrayList就是完整的執行計劃了。然後就是一次呼叫PlanFragment.toThrift()把它序列化到TQueryExecRequest。填充TQueryExecRequest的相關變數:dest_fragment_idx,per_node_scan_ranges,query_globals,result_set_metadata等。最後返回TExecRequest型的物件給backend執行。

Impala-backend(C++程式碼)拿到這個TExecRequest物件,有coordinator在各個backend之間分發執行,這是下一篇文章的內容了。

吐槽:從中還是能夠看到MapReduce的影子的。。。對於每個PlanFragment有個DataStreamSink,會指向其他PlanFragment中的ExchangeNode,是個1對N的關係。。。所以分散式系統的瓶頸還是Data Shuffle,不管是MapReduce模型還是impala。這也說明其實Tez/Stinger Initiative 對Hive的優化還是很值得期待的。