F1 Query: Declarative Querying at Scale
距離 Google 的上一篇 F1 論文,也就是 F1: A Distributed SQL Database That Scales 已經 5 年過去了,Google 在今年的 VLDB 上終於釋出了 F1 的新版本 F1 Query: Declarative Querying at Scale ,我們今天就來看一下這篇論文。安利一下,今天上午在 PingCAP 的 paper party 上,黃東旭@Ed Huang 大神對這篇論文的講解非常精彩,文章中也部分引用了他的觀點,在此鳴謝。
2013 年的 F1 是基於 Spanner,主要提供 OLTP 服務,而新的 F1 則定位則是大一統:旨在處理 OLTP/OLAP/ETL 等多種不同的 workload。但是這篇新的 F1 論文對 OLTP 的討論則是少之又少,據八卦是 Spanner 開始原生支援之前 F1 的部分功能,導致 F1 對 OLTP 的領地被吞併了。下面看一下論文的具體內容,疏漏之處歡迎指正。
0. 摘要
F1 Query 是一個大一統的 SQL 查詢處理平臺,可以處理儲存在 Google 內部不同儲存介質(Bigtable, Spanner, Google Spreadsheet)上面的不同格式檔案。簡單來說,F1 Query 可以同時支援如下功能:OLTP 查詢,低延遲 OLAP 查詢,ETL 工作流。F1 Query 的特性包括:
1. 背景
在 Google 內部的資料處理和分析的 use case 非常複雜,對很多方面都有不同的要求,比如資料大小、延遲、資料來源以及業務邏輯支援。結果導致了許多資料處理系統都只 focus 在一個點上,比如事務式查詢、OLAP 查詢、ETL 工作流。這些不同的系統往往具有不同的特性,不管是使用還是開發上都會有極大的不便利。
F1 Query 就在這個背景下誕生了,用論文中的話說就是
we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis.
F1 Query 旨在覆蓋資料處理和分析的所有方面。F1 Query 在內部已經應用到了多個產品線,比如 Advertising, Shopping, Analytics 和 Payment。
在 F1 Query 的系統設計過程中,下面幾點考量具有非常關鍵的作用。
- Data Fragmentation : Google 內部的資料由於本身的特性不同,會被儲存到不同的儲存系統中。這樣會導致一個應用程式依賴的資料可能橫跨多個數據儲存系統中,甚至以不同的檔案格式。對於這個問題,F1 Query 對於這些資料提供一個統一的資料檢視。
- Datacenter Architecture : F1 Query 的目標是多資料中心,這個和傳統的 shared nothing 架構的資料處理系統不同相同。傳統的模式為了降低延遲,往往需要考慮 locality,也就是資料和計算越近越好。由於 Google 內部的網路環境優勢,locality 的優勢顯得不是那麼重要。所以 F1 Query 更強調計算和儲存分離,這樣計算節點和儲存節點的擴充套件性(scalability)都會更好。畢竟 Google 內部的系統,scalability 才是第一法則。還有一點值得一提的是,由於使用了 GFS 的更強版本: Colossue File System,磁碟不會成為瓶頸。
- Scalability : 在 F1 Query 中,short query 會在單個節點上執行,larger query 會以分散式的模式執行,largest query 以批處理 MapReduce 模式執行。對於這些模式,F1 Query 可以通過增加運算的並行度來優化。
- Extensibility : 對於那些無法用 SQL 語義來表達的查詢需求,F1 通過提供 user-defined functions (UDF)、user-defined aggregate functions (UDAs) 和 table-valued functions (TVF) 來支援。
2. 架構
F1 的架構圖如下所示:

下面的方框裡面是每個 Datacenter 一套。關於各個元件的介紹如下:
- 使用者通過 client libary 和 F1 Server 互動
- F1 Master 負責 query 的狀態的執行時監控和其他元件的管理
- F1 Server 收到使用者請求,對於 short query 直接單機執行查詢;對於 larger query 轉發到多臺 worker 上並行執行查詢。最後再彙總結果返回給 client。
- F1 Worker 負責具體查詢執行
- F1 Server 和 Worker 都是無狀態的,方便擴充套件
2.1 query 執行
使用者通過 client libary 提交 query 到 F1 Server 上,F1 Server 首先解析和分析 SQL,然後將涉及到的資料來源提取出來,如果某些資料來源在當前 datacenter 不存在,則直接將 query 返回給 client 並告知哪些 F1 Server 距離哪些資料來源更近。這裡直接將請求返回給業務層,由業務層去 retry,設計的也是非常的簡單。儘管前面說到要將儲存和計算分離,但是這個地方的設計還是考慮到了 locality,datacenter 級別的 locality,畢竟 locality 對查詢延遲的影響還是巨大的。
F1 Server 將 query 解析並優化成 DAG,然後由執行層來執行,具體執行模式(interactive 還是 batch)由使用者指定。原文是: Based on a client- specified execution mode preference, F1 Query executes queries on F1 servers and workers in an interactive mode or in a batch mode.
對於互動式查詢模式(interactive mode)有單節點集中執行模式和多節點分散式執行模式,query 優化會根據啟發式的演算法來決定採用哪種模式。集中式下,F1 Server 解析分析 query,然後在當前節點上直接執行並接收查詢結果。分散式下,接收 query 的 F1 Server 充當一個 query coordinator 的角色,將 query 拆解並下發給 worker。互動式查詢在資料量不太大的情況下往往具有不錯的效能和高效的資源利用率。
除了互動式查詢還有一種模式是批處理模式(batch mode)。批處理模式使用 MapReduce 框架非同步提交執行執行,相比互動式這種 long-running 方式,批處理模式的可靠性(reliabitly)更高。
2.2 資料來源
資料查詢支援跨 datacenter。儲存計算分離模式使得多資料來源的支援更加簡單,比如 Spanner, Bigtable, CSV, columnar file 等。為了支援多資料來源,F1 Query 在他們之上抽象出了一層,讓資料看起來都是儲存在關係型表裡面。而各個資料來源的元資料就儲存在 catalog service 裡面。
對於沒有儲存到 catalog service 裡面的表資料,只要提供一個 DEFINE TABLE
即可查詢。
DEFINE TABLE People( format = ‘csv’, path = ‘/path/to/peoplefile’, columns = ‘name:STRING, DateOfBirth:DATE’); SELECT Name, DateOfBirth FROM People WHERE Name = ‘John Doe’;
論文中沒有提到的是單看這個 DEFINE TABLE 可以表現力不夠,所說這些資訊並不足以表現出資料的行為:
- 是否支援 partition?
- 是否支援 邏輯下推?
- 是否支援索引?
- 是否支援多種 掃描模式?
對於新資料來源的支援可以通過 Table-Valued Function (TVF) 的方式來支援。
2.3 Data Sink
query 的結果可以直接返回給 client,也可以插入到另外一個表裡面。
2.4 SQL
SQL 2011。之所以是 2011 是因為其他老的系統使用的是 2011。
3. 互動式查詢
互動式查詢模式是預設的查詢模式。如前所述,互動式查詢有集中式和分散式,具體使用哪種由優化器分析 client 的 query 然後決定。
3.1 Single Threaded Execution Kernel
集中式的查詢如下圖所示,是一種 pull-based 的單執行緒執行方式。

3.2 Distributed Execution
如前所述,由優化器分析完 query 決定是否採用分散式模式。在分散式這種模式下接收到 query 的 F1 Server 充當一個 coordinator 的角色,將執行 plan 推給 worker。worker 是多執行緒的,可以併發執行單個 query 的無依賴的 fragment。Fragment 是執行計劃切分出來的執行計劃片段,非常像 MR 或者 Spark 中的 stage。Fragment 之間通過 Exchange Operator (資料重分佈) 連線。
Fragment 的切分過程如下:優化器使用一種基於資料分佈依賴的 bottom-up 策略。具體來說每個運算元對於輸入資料的分佈都有要求,比如 hash 或者依賴其他欄位的分佈。典型的例子有 group by key 和 hash join。如果當前的資料分佈滿足前後兩個運算元的要求,則兩個運算元就被放到一個 Fragment 裡面,否則就被分到兩個 Fragment 裡面,然後通過 Exchange Operator 來連線。
下一步就是計算每個 Fragment 的並行度,Fragment 之間並行度互相獨立。葉子節點的 Fragment 的底層 table scan 決定最初的並行度,然後上層通過 width calculator 逐步計算。比如 hash-join 的底層兩個 Fragment 分別是 100-worker 和 50-worker,則 hash-join 這個 Fragment 會使用 100-worker 的並行度。下面是一個具體的例子。
SELECT Clicks.Region, COUNT(*) ClickCount FROM Ads JOIN Clicks USING (AdId) WHERE Ads.StartDate > ‘2018-05-14’ AND Clicks.OS = ‘Chrome OS’ GROUP BY Clicks.Region ORDER BY ClickCount DESC;
上面 SQL 對應的 Fragment 和一種可能 worker 並行度如下圖所示:

3.3 Partitioning Strategy
資料重分佈也就是 Fragment 之間的 Exchange Operator,對於每條資料,資料傳送者通過分割槽函式來計算資料的目的分割槽數,每個分割槽數對應一個 worker。Exchange Operator 通過 RPC 呼叫,擴充套件可以支援到每個 Fragment 千級的 partion 併發。要求再高就需要使用 batch mode。
查詢優化器將 scan 操作作為執行計劃的葉子節點和 N 個 worker 節點併發。為了併發執行 scan 操作,資料必須要被併發分佈,然後由所有 worker 一起產生輸出結果。有時候資料的 partition 會超過 N,而 scan 併發度為 N,多餘的 partition 就交由空閒的 worker 去處理,這樣可以避免資料傾斜。
3.4 Performance Considerations
F1 Query 的主要效能問題在於資料傾斜和訪問模式不佳。Hash join 對於 hot key 尤為敏感。當 hot key 被 worker 載入到記憶體的時候可能會因為資料量太大而寫入磁碟,從而導致效能下降。
論文中舉了一個 lookup join 的例子,這裡不打算詳述了。
對於這種資料傾斜的問題,F1 Query 的解決方案是 Dynamic Key Range,但是論文中對其描述還是不夠詳細。
F1 Query 對於互動式查詢採用存記憶體計算,而且沒有 check point。因為是記憶體計算,所以速度非常的快,但是由於沒有 checkpoint 等 failover 的機制,只能依賴於業務層的重試。
4. 批處理
像 ETL,都是通過 Batch Mode 來處理的。Google 以前都是通過 MapReduce 或者 FlumeJava 來開發的,開發成本一般比較高。相比 SQL 這種方式,不能有效複用 SQL 優化,所以 F1 Query 選擇使用 SQL 來做。
如前所述,互動式查詢不適合處理 worker failure,而 batch mode,也就是批處理這種模式特別適合處理 failover(每一個 stage 結果落盤)。批處理模式複用互動式 SQL query 的一些特性,比如 query 優化,執行計劃生成。互動式模式和批處理模式的核心區別在於排程方式不同:互動式模式是同步的,而批處理模式是非同步的。
4.1 Batch Execution Framework
批處理使用的框架是 MapReduce,Fragment 被抽象成 MapReduce 中的 stage,stage 的輸出結果被儲存到 Colossus file system (GFS 二代)。
在 Fragment 對映有一點值得注意的是嚴格來說,Fragment 的 DAG 對映到 mr 是 map-reduce-reduce,對這種模式做一個簡單的變通變成:map-reduce-map\<identity>-reduce,如下圖:

關於 MapReduce 的更詳細資訊可以參考 Google 03 年那篇論文。
4.2 Batch Service Framework
Framework 會對 batch mode query 的執行進行編排。具體包括:query 註冊,query 分發,排程已經監控 mr 作業的執行。當 F1 Server 接收到一個 batch mode query,它會先生成執行計劃並將 query 註冊到 Query Registry,全域性唯一的 Spanner db,用來 track batch mode query。Query Distributor 然後將 query 分發給 datacenter。Query Scheduler 會定期從 Registry 拿到 query,然後生成執行計劃並交給 Query Executor 來處理。
Service Framework 的健壯性非常好:Query Distributor 是選主(master-elect)模式;Query Scheduler 在每個 datacenter 有多個。query 的所有執行狀態都是儲存在 Query Registry,這就保證其他的元件是無狀態的。容錯處理:MapReduce 的 stage 會被重試,如果 datacenter 出問題,query 會被分配到新的 datacenter 上重新執行。
5. 優化器
SQL 優化器類似 Spark Catalyst,架構如下圖,不細說了。
6. EXTENSIBILITY
對於很多複雜業務邏輯無法用 SQL 來描述,F1 針對這種情況提供了一種使用者自定義函式的方法,包括 UDF (user-define functions),UDA (aggrega- tion functions) 和 TVF (table-valued functions)。自定義函式開發語言支援 SQL、LUA、SQL 和其它高階語言,實現在 UDF Server 上。
UDF Server 和 F1 Query 是 RPC 呼叫關係,有 client 單獨部署在同一個 datacenter。udf server 完全有 client 來控制,無狀態,基本可以無限擴充套件。
6.1 Scalar Functions
UDF 並不是新的概念,UDF Server 這種部署方式看上去還算新穎一點。但是 UDF Server 這種單獨部署模式一個可能的問題是延遲問題,這裡通過批量流水線的方式來減少延遲。下面是 UDF 的一個例子。
local function string2unixtime(value) local y,m,d = match("(%d+)%-(%d+)%-(%d+)") return os.time({year=y, month=m, day=d}) end
6.2 Aggregate Functions
UDA 是對多行輸入產生一個單一的輸出,要實現 UDA,使用者需要實現運算元 Initialize, Accumulate, and Finalize。另外如要要對多個 UDA 的子聚合結果進行再聚合,使用者可以實現 Reaccumulate。
6.4 Table-Valued Functions
TVF 的輸入是一個 table,輸出是另外一個 table。這種在機器學習的模型訓練場景下比較有用。下面是論文中的具體的一個例子:EventsFromPastDays 就是一個 TVF。
SELECT * FROM EventsFromPastDays( 3, TABLE Clicks);
當然 TVF 也支援用 SQL 來描述,如下。
CREATE TABLE FUNCTION EventsFromPastDays( num_days INT64, events ANY TABLE) AS SELECT * FROM events WHERE date >= DATE_SUB( CURRENT_DATE(), INTERVAL num_days DAY);
7. Production Metric
下面是 F1 Query 在 Production 環境下的幾個 metrics。


8. 總結
回過頭來看 F1 Query 最新的這篇論文給人最大的啟發就是大一統的思想,這個很有可能是行業發展趨勢。回想一下 MapReduce 論文由 Google 於 2003 年發表,開源實現 Hadoop 於 2005 問世。不妨期待了一下未來的 3 到 5 年的 F1 Query 的開源產品。
最後再感謝一下 PingCAP 以及@Ed Huang 。meetup 分享很贊。