1. 程式人生 > >Survey Report on Data Skew in Big Data

Survey Report on Data Skew in Big Data

range 如何 長時間 變量 延遲 過濾 gas 而且 允許

1 Introduction

信息時代產生了大量的數據,運用和使用數據已經成為一個公司乃至一個國家核心實力的重要組成部分。當代大數據一般指的是:數據量巨大,需要運用新處理模式才能具有更強的決策力、洞察力和流程優化能力的海量、高增長和多樣化的信息資產。大數據的特征有四個層面:第一:數據量巨大,從TB級別,躍升到PB級別;第二,數據類型繁多,包括網絡日誌,視頻,圖片和地理信息等;第三,價值密度低,商業價值高,以視頻為例,在連續不間斷的監控過程中,可能有用的數據僅僅只有一兩秒;第四,處理速度快。也就是4V——Volumn、Varity、Value和Velocity[1]。
大數據對系統提出了很多極限要求,不論是存儲、傳輸還是計算,現有計算技術都難以滿足大數據的需求,因此整個IT架構的革命性重構勢在必行,存儲能力的增長遠遠趕不上數據的增長,設計合理的封層存儲架構已經成為了信息系統的關鍵[2]。分布式存儲架構不僅要求scale up式的可擴展性,也需要scale out式的可擴展性,因此大數據處理離不開雲計算技術。雲計算可為大數據提供彈性可擴展的基礎設施支撐環境以及數據服務的高效模式,大數據則為雲計算提供了新的商業價值[3]。
為了應對大數據帶來的困難和挑戰,以Google, Microsoft, Facebook為代表的互聯網或傳統軟件公司不斷推出新的技術和大數據處理系統。借助於巧妙設計的處理系統,機器學習,可視化技術等大數據分析技術近幾年也在不斷湧現。
雲計算的關鍵技術包括分布式並行計算、分布式存儲以及分布式數據管理技術,而Hadoop就是一個實現了Google雲計算系統的開源平臺[4],包括並行計算模型MapReduce[5],分布式文件系統HDFS[6]以及分布式數據庫Hbase,同時Hadoop的相關項目也很豐富,包括Zookeeper,Pig,Chukwa,Hive,Mahout等等。
然而,在計算的時候,由於數據的分散度不夠,導致大量的數據集中到了一臺或者幾臺機器上計算或者由於節點的性能等因素,這些數據的計算速度遠遠低於平均計算速度,從而導致整個計算過程過慢,這種現象稱之為數據偏斜[7]。數據偏斜對大數據計算框架性能的影響巨大,而且數據偏斜在現實世界中普遍存在,比如PageRank任務中存在的高訪問量網站,社交網絡中高熱度節點。因此,解決數據偏斜是降低大數據處理時間的關鍵之一。
由於雲計算能夠將作業自動並行處理為多個短任務,並能夠透明地處理在分布式環境中執行任務帶來的挑戰,雲計算技術帶來了廣泛的歡迎。一個基本挑戰是所有雲計算框架(如MapReduce,Dryad和Spark)面臨的落伍(stragging)任務。落伍任務(stragger)是指比其他任務運行速度慢得多的任務,並且由於任務必須在上遊任務完成時才執行,所以落伍任務會延遲整個工作的完成[8]。落伍任務特別影響小型工作,即僅僅由幾項任務組成的工作。這類工作通常可以一次完成所有任務。因此,即使單個任務很慢,或不穩定,整個工作也會顯著延遲。小型工作普遍存在,數據中心運營商的經驗表明,這些小型工作通常用於執行交互式和探索性分析。實現此類工作的低延遲對於數據分析師有效探索數據至關重要。為了獲得較低的延遲,分析師已經將他們的查詢限制在小而精的數據集中,這也就導致大多工作只包含一些短任務。這種探索性分析的趨勢從Facebook的Hadoop生產群集和Microsoft Bing的Dryad群集的分析中顯而易見。超過80%的Hadoop工作和超過60%的Dryad工作都很小,只有不到10個任務。實現這些小型互動作業的低延遲是數據中心運營商關註的首要問題。落伍任務(stagger)問題已經引起了相當大的關註,正在開發一系列偏斜緩解技術以解決該問題。這些技術大致可以分為兩類:黑名單和推測性執行。然而,實驗顯示,即使在應用最先進的黑名單和推測性執行技術之後,含有數據偏斜問題的任務比正常任務慢八倍。因此,落後任務仍然是小型工作的問題。
這兩種方法有其固有的局限性。黑名單標識機器運行狀況不佳(例如由於磁盤故障),並避免在其上安排任務[9]。事實上,Facebook和Bing集群大約有10%的機器黑名單。然而,由於IO爭用,定期維護操作和後臺服務的幹擾以及硬件行為等復雜原因,未列入黑名單的機器上會出現任務落伍現象。出於這個原因,投機性執行被用來處理落後節點。
推測性執行觀察任務的進度並啟動較慢任務的克隆[10]。然而,推測性執行技術在處理小型工作時有一個根本的限制:任何有意義的觀測都需要等待收集具有統計意義的任務進度樣本,這種等待限制了他們在處理小型工作中落伍節點時的敏捷性,因為他們經常同時開始他們的所有任務。如果某些任務在執行完成後開始出現混亂,則問題會加劇。在這一點上產生一個推測性副本可能為時太晚而無法提供幫助。

2 Background
2.1 大數據概況

在過去的20年中,數據在各個領域都有了大規模的增長。根據國際數據公司(IDC)的報告,2011年,全球創建和復制的數據量為1.8ZB(≈1021B),在五年內增加近9倍。這個數字在不久的將來至少每兩年翻一番。在全球數據爆炸性增長的情況下,大數據這個術語主要用來代表大數據集。與傳統數據集相比,大數據通常包括大量非結構化數據,需要更多的實時分析。此外,大數據也為發現新的價值提供了新的機會,有助於我們深入了解隱藏的價值觀,同時也帶來了新的挑戰,例如如何有效地組織和管理這些數據集。最近,行業開始關註大數據的巨大潛力,許多政府機構宣布了加速大數據研究和應用的主要計劃。此外,大數據方面的問題經常在“經濟學人”,紐約時報和國家公共廣播電臺等公共媒體中報道。兩大首屈一指的科學期刊Nature和Science也開設了專欄來討論大數據的挑戰和影響[11]。大數據時代已經不容置疑。例如,谷歌處理數百PB的數據,Facebook生成10PB以上的日誌數據,百度每天處理數十PB的數據,淘寶是阿裏巴巴的子公司,每天因為在線交易產生數十兆字節(TB)的數據。我們可以從廣泛分布的數據源中獲得了大量的數據,並將大量數據整合在一起。而雲計算和物聯網(物聯網)則進一步促進了數據的急劇增長。雲計算為數據資產提供了保護、訪問站點和通道。現代應用中,世界各地的傳感器都在收集和傳輸數據,以便在雲中存儲和處理數據。在這些數據將遠遠超過IT體系結構和現有企業的基礎設施的能力。越來越多的數據導致了龐大的異構數據集的存儲和分析問題[12]。如何可視化和預測的過程中有效地“挖掘”數據,從而揭示其內在屬性並改進決策過程,是一個具有挑戰性的任務。

2.2 大數據發展

20世紀70年代末,“數據庫”的概念出現了,這是一種專門用於存儲和分析數據的技術。隨著數據量的增加,單一主機計算機系統的存儲和處理能力不足。在20世紀80年代,人們提出了一個並行的數據庫系統,以滿足不斷增長的數據量的需求。沒有任何系統架構是基於集群的使用,而每臺機器都有自己的處理器、存儲和磁盤。Teradata系統是第一個成功的商業並行數據庫系統。這樣的數據庫最近變得非常流行。在1986年6月2日,當Teradata交付第一個並行數據庫系統時,一個裏程碑事件發生了,它的存儲容量為1 TB,以幫助北美的大型零售公司擴展其數據倉庫[13]。在20世紀90年代末,並行數據庫的優勢在數據庫領域得到了廣泛的認可。然而,在大數據上出現了許多挑戰。隨著網絡服務的發展,索引和查詢內容的快速增長。因此,搜索引擎公司不得不面對處理如此大數據的挑戰。谷歌提出了GFS和MapReduce編程模型,以應對數據管理和互聯網規模分析帶來的挑戰。此外,用戶、傳感器和其他無處不在的數據源所生成的內容也會影響到體量巨大的數據流,這需要對架構和大型數據處理機制進行基本的改變。2007年1月,數據庫軟件的先驅吉姆格雷將這種轉變稱為“第四範式”[14]。他還認為,應對這種模式的唯一方法是開發新一代的計算工具來管理、可視化和分析大量數據。2011年6月,又發生了另一個裏程碑事件:emc/idc發布了一份題為《從混沌中提取價值》的研究報告,該報告首次介紹了大數據的概念和潛力。這一研究報告引發了業界和學術界對大數據的極大興趣。在過去的幾年中,幾乎所有的大公司,包括EMC、甲骨文、IBM、微軟、谷歌、亞馬遜和Facebook等都已經開始了他們的大數據項目。以IBM為例,自2005年以來,IBM已經在30項與大數據相關的收購上投資了160億美元。在學術界,大數據也成為人們關註的焦點。2008年,自然雜誌發布了一個大數據特刊。2011年,科學還對大數據中“數據處理”的關鍵技術提出了一個特殊的問題。2012年,歐洲信息與數學研究聯盟(ERCIM)發布了一份關於bigdata的特殊問題。在2012年初,“大數據”——bigdata——達沃斯論壇在瑞士舉行,宣布大數據就像貨幣或黃金一樣已經成為一種新的經濟資產。許多國家政府,如美國,也非常關註大數據。2012年3月,奧巴馬政府宣布投資2億美元,啟動“大數據研究與發展計劃”,這是1993年“信息高速公路”倡議後的第二個重大科技發展倡議。2012年7月,日本內政部和通信部發布的“充滿活力的信息通信技術日本”項目表明,大數據開發應該是一項國家戰略,應用技術應該是重點。2012年7月,聯合國發布了大數據發展報告,該報告總結了政府如何利用大數據更好地服務和保護人民。

2.3 大數據的挑戰

大數據時代急劇增加的數據流給數據采集、存儲、管理和分析帶來了巨大的挑戰。傳統的數據管理和分析系統是基於關系數據庫管理系統(RDBMS)的。然而,這種RDBMS只適用於結構化數據,而不是半結構化或非結構化數據。此外,RDBMS越來越多地使用越來越昂貴的硬件。很明顯,傳統的RDBMS無法處理大數據的巨大容量和異構性。關於龐大數據流的存儲分析的研究成為迫在眉睫的任務,例如,雲計算被用來滿足大數據基礎設施的需求。對於大規模無序數據集的永久存儲和管理的解決方案,分布式文件系統和nosql數據庫都是不錯的選擇。此外,部署大數據分析系統並非易事。主要的挑戰如下:(1)數據表示:許多數據集具有一定程度的結構、語義、組織、粒度和可訪問性方面的不同。數據表示旨在使數據對計算機分析和用戶解釋更有意義。然而,不適當的數據表示將降低原始數據的價值,甚至可能阻礙有效的數據分析。有效的數據表示應能夠反映數據的結構和類型,以便在不同的數據集上實現高效的操作。(2)減少冗余和數據壓縮:一般來說,數據集有很高的冗余。減少冗余和數據壓縮對於減少整個系統的間接成本是有效的,前提是數據的潛在價值不受影響。例如,傳感器網絡生成的大多數數據都是高度冗余的,可以按數量級進行過濾和壓縮。(3)數據生命周期管理:與存儲系統相對緩慢的進展相比,無處不在的傳感和計算正在以前所未有的速度和規模生成數據。我們面臨著許多緊迫的挑戰,其中之一就是當前的存儲系統無法支持如此大規模的數據。一般來說,隱藏在大數據中的價值依賴於數據的時效性。因此,應開發與分析值相關的數據重要性原則,以決定應存儲哪些數據,哪些數據應被丟棄。大數據分析系統應在有限的時間內處理大量的異構數據[15]。然而,傳統的RDBMS是嚴格設計的,缺乏可伸縮性和可擴展性,無法滿足性能要求。非關系數據庫在處理非結構化數據方面顯示了它們的獨特優勢開始成為大數據分析的主流。即便如此,在它們的性能和特定的應用程序中仍然存在一些非關系數據庫的問題。我們將在關系型數據庫和非關系數據庫之間找到一個折衷的解決方案。一些企業使用了混合數據庫架構,集成了這兩種數據庫的優點(例如Facebook和淘寶)。(4)數據機密性:由於容量有限,目前大多數大型數據服務提供商或所有者無法有效地維護和分析如此龐大的數據集。他們必須依靠專業人員或工具來分析這些數據,這增加了潛在的安全風險。例如,事務數據集通常包括一組完整的操作數據來驅動關鍵業務流程。這些數據包含了最低粒度的細節和一些敏感信息,比如信用卡號。因此,只有當采取適當的預防措施來保護這些敏感數據,以確保其安全性時,對大數據的分析才可能被交付給第三方[16]。(5)能源管理:大型計算系統的能源消耗已經引起了人們對經濟和環境的關註。隨著數據量的增加和分析需求的增加,大數據的處理、存儲和傳輸將不可避免地消耗更多的電能。因此,在保證大數據的可擴展性和可訪問性的同時,需要建立系統級的電力消耗控制和管理機制。E.可擴展性和可伸縮性:大數據的分析系統必須支持當前和未來的數據集。分析算法必須能夠處理越來越多的擴展和更復雜的數據集。

3 The Existing Techniques

3.1 Hadoop

Hadoop是一種分布式基礎架構系統,是Google的雲計算基礎架構的開源實現。Google采集系統的核心組件有兩個:第一個是GFS(Google FileSystem),一個分布式文件系統,隱藏下層的負載均衡,冗余復制等細節,對上層程序提供一個統一的文件系統的API接口[17];第二個是MapReduce計算模型,Google發現大多數分布式計算均可以抽象為Map/Reduce操作。Map是把輸入Input分解為Key/Value對,Reduce是把Key/Value合成最終輸出Output[5]。這兩個函數都需要用戶提供給系統,下層的設施將Map和Reduce操作分配給集群的各個節點上運行,將最終結果存儲在GFS上。
而Hadoop是Google集群的開源實現,主要由HDFS和MapReduce組成,其中HDFS是GFS的開源實現,而MapReduce是Google MapReduce的開源實現。用戶只需要分別實現Map和Reduce,並註冊Job即可自動分布式運行,用戶使用十分方便快捷。
一般而言,狹義的Hadoop就是指HDFS和MapReduce,這是一種典型的Master-Slave框架,Master邏輯節點使用NameNode和JobTracker組成,NameNode是HDFS的Master,主要負責Hadoop分布式文件系統的元數據的管理工作,JobTracker是MapReduce的Master,主要職責是啟動,跟蹤,調度各個TaskTracker的任務執行,每一個Slave邏輯節點通常同時具有DataNode以及TaskTracker的功能。

MapReduce的一個主要優勢是它具有高效的容錯機制。如果一個節點崩潰,MapReduce會在另一臺機器上重新運行它的任務。同樣重要的是,如果一個節點可用但表現不佳,這種情況稱為失散/落伍(straggler),MapReduce會在另一臺機器上運行其任務的副本(也稱為“備份任務”),以更快地完成計算。如果沒有這種推測性執行機制,任務會因為最慢的節點而使整個任務執行緩慢,或者由於多種原因而導致失敗。谷歌註意到,推測執行可以將工作響應時間提高44%。
但是,推測執行是一個復雜的問題,原因有幾個。首先,投機任務不是免費的,它們與其他正在運行的任務競爭某些資源,例如網絡。其次,選擇節點運行推測性任務與選擇任務同等重要。第三,在異構環境中,可能難以區分比平均值和離散值稍慢的節點。最後,應盡早確定落後者,以縮短響應時間。因此在MapReduce中,為了應對數據偏斜帶來的影響,設計高效的推測執行機制是非常有必要的。

3.2 Spark

Spark 是UC Berkeley AMPLab於2009年發起的,然後被Apache軟件基金會接管的類Hadoop MapReduce通用性並行計算框架,是當前大數據領域最活躍的開源項目之一。Spark是基於MapReduce計算框架實現的分布式計算,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是,中間輸出和結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark更適用於數據挖掘與機器學習等需要叠代的算法。Spark由Scala語言實現的,Scala是一種基於JVM的函數式編程語言,提供了類似DryadLINQ的編程接口。而且Spark還提供修改的Scala語言解釋器,能方便地用於交互式編程,用戶可以定義變量、函數、類以及RDD[18]。
因此,Spark比Hadoop的傳統處理方式MapReduce有著很大的差別,效率至少提高100倍以上。Spark分為四大模塊:Spark SQL(類SQL數據管理),MLlib(機器學習),Graphx(圖計算),Spark Streaming(實時處理)。這四個部分的數據處理單元都是RDD。所以整個框架形成了大數據處理各種應用場景編程的一致性。同時,Spark是基於內存的編程模型,它可以把中間的叠代過程不放在磁盤中,直接數據不落地在內存中執行,極大地提高了它的執行速度。下面來介紹它的各個模塊。

(1)Spark SQL允許在Spark中使用SQL和HiveQL 的相關查詢表達式。這個組件的核心是一個新型的RDD,JavaSchemaRDD。JavaSchemaRDD是由Row對象和Schema描述行中每一列的數據類型。JavaSchemaRDD類似一個關系型數據庫中的表。一個JavaSchemaRDD可以通過已存在的RDD,Parquet文件,一個JSON數據集或者存儲在Apache Hive 通過HiveQL運行的數據來創建[19]。
(2)MLlib包含一些常用機器學習算法和工具,分類,回歸,聚類,協同過濾,降維,以及相關的優化原語[20]。許多標準的機器學習方法可以歸結為一個凸優化問題,例如一項任務,去找到一個凸面函數F的最小值,這個凸面函數依賴於可變的Vectorw向量,這個向量在一個Node 中叫做Weights權值。MLlib 包括的評價指標有:精確度,召回率,F值,ROC,曲線下面積(AUC)。
(3)GraphX是Spark用於圖表和圖形,並行計算的的API。在一個高層次上, GraphX 延伸了Spark RDD, 通過引入Resilient Distributed Property Graph (彈性分布式屬性圖):一個有向多重圖與附加到每個頂點和邊的屬性。為了支持圖形計算,,GraphX公開了一組基本的運算符(subgraph ,joinVertices,mapReduceTriplets) [21]。從社交網絡到語言建模,由於圖形數據的增長規模和重要性,帶動眾多新圖形並行系統(例如Giraph and GraphLab)的發展。通過限制可以被表示計算的類型和引入新技術來劃分和分配圖形,這些系統能夠有效地執行復雜的圖形算法命令並遠快於大多數普通的數據並行系統。
(4)Spark Streaming。如果要用一句話來概括Spark Streaming的處理思路的話,那就是“將連續的數據持久化,離散化,然後進行批量處理”。Spark Streaming是Spark核心的擴展API,允許使高通量、容錯實時數據流的流處理。數據可以從許多來源攝取,如Kafka,Flume,Twitter,ZeroMQ或普通TCP套接字。最後處理過的數據可以放到文件系統,數據庫和可視化儀表板上。事實上,你可以引用內置的Spark的機器學習算法,數據流圖處理算法。
像Hadoop和Spark這樣的大規模數據分析框架現在已經得到廣泛應用。因此,學術界和產業界都付出了巨大的努力來改善這些框架的性能。研究人員致力於解決三種限制大數據分析框架的關鍵問題:(1)網絡是一個瓶頸。這引起了一系列網絡優化的工作,包括跨多個路徑的負載平衡、聚合數據以減少流量、隔離等等。(2)磁盤是一個瓶頸。這就導致了眾多緩存策略的產生。(3)落後的任務大大延長了工作完成時間,這推動著推測運行和負載均衡的發展。研究人員已經定位到導致該問題的原因,如數據偏斜和流行度偏差。

3.3 Making Sense of Performance in Data Analytics Frameworks

阻塞時間分析,這是一種量化性能瓶頸的方法。由於分布式中普遍使用的並行性,識別數據,分析框架的瓶頸是一項巨大的挑戰:作業由許多並行任務組成,每個任務使用流水線來並行使用網絡,磁盤和CPU資源。一個任務可能會在不同的資源執行過程中受到瓶頸限制,並且在任何時候,同一個任務可能會受到不同資源的瓶頸。該方法提出的阻塞時間分析,是使用大量的白盒日誌記錄來度量每個任務在給定資源上被阻塞的時間。使用每項任務的測量結果,可以將緩慢任務與長時間阻塞時間關聯起來,以了解其中的原因[22]。也就是說,針對特定作業的任務度量,允許我們了解,如果磁盤或網絡能夠無限快速,任務完成需要多長時間,這為優化網絡或磁盤性能提供了上限。

3.4 SkewTune in Action: Mitigating Skew in MapReduce Applications

SkewTune將Hadoop作業作為輸入。為了避免偏斜,SkewTune將作業的拓撲和reduce階段視為單獨的UDO(user-defined operations, 用戶自定義操作)。在SkewTune中,與在Hadoop中一樣,UDO從前一個UDO的輸出中提取輸入,並將其輸入到本地。假定UDO一次處理輸入的一條記錄,而在各個輸入記錄之間不保留狀態。鍵值對(即mapper輸入)和鍵組(即reducer輸入)均被認為是記錄的特殊情況。每個UDO被並行化為任務,並且每個任務在集群中被分配一個槽(slot)。任務完成後,插槽即變為可用[23]。 SkewTune的偏斜緩解技術專為MapReduce類型的數據處理引擎而設計。這些引擎相對於傾斜處理的三個重要特征如下:(1)coordinator-worker體系結構,其中coordinator節點進行調度決策並且worker節點運行其分配的任務。完成一項任務後,worker節點向coordinator請求一項新任務。 (2)解耦執行:operator不會對上遊operator施壓。相反,他們彼此獨立執行。 (3)獨立記錄處理:任務正在執行一個UDO,它相互獨立地處理每個輸入記錄(可能是嵌套的)。(4)任務進度估計,估計每個任務的剩余時間。每個worker定期向coordinator報告這一估計。(5)按任務統計:每項任務都會跟蹤一些基本統計信息,例如未處理字節和記錄的總數。

3.5 Camdoop: Exploiting In-network Aggregation for Big Data Applications

CamCube上運行的類似MapReduce的系統,支持數據流的全路徑聚合。執行聚合投影時,根據服務器的中間數據集中共有多少鍵,利用中間數據的來源構建聚合樹。這可以減少網絡流量,因為在每跳處只有一小部分接收到的數據被轉發。所有轉發流量的服務器都將有效地參與reduce階段,分擔計算負載,這對於輸出大小通常遠小於中間數據大小的工作負載很重要。可以證明,當使用關聯和交換reduce函數時,可以有顯著的性能提升,最高達兩個數量級[24]。Camdoop同樣提供了容錯機制,以及我們如何確保中間數據中的每個條目在出現故障時只在最終輸出中包含一次。實驗還表明,即使reduce函數不具有關聯性和交換性,路徑上聚合仍然具有以下優點:Camdoop可以將負載分布到所有服務器上,並支持將reduce階段並行化,從而進一步縮短總工作時間。最後,Camdoop利用定制的傳輸層,通過利用應用程序級先驗允許基於內容的數據包優先級調度。即使沒有執行聚合,這也有助於確保reducer不會因為缺乏處理任務而停止。Camdoop支持在MapReduce 中使用相同功能集,它旨在成為Hadoop的插件,並且與現有的MapReduce作業兼容。由於MapReduce廣泛的適用性和普及性,我們選擇其作為編程模型。但是,該方法也可以擴展到其他使用分區/聚合模型的平臺,例如Dryad 或Storm。

3.6 Dolly系統,Effective Straggler Mitigation: Attack of the Clones

該系統提出一種不同的方法。不是等待並試圖預測落伍任務,而是將推測執行推向極端,並建議對每個任務啟動多個副本,並僅使用首先完成的副本的結果。這種技術既通用又強大,因為它避免了等待,推測以及尋找復雜的相關性。在處理小型互動工作時,這種積極的克隆將顯著提高緩解落後任務的敏捷性。克隆帶來兩個主要挑戰:第一個挑戰是額外的克隆可能會使用額外的資源。但是,對生產日誌的分析顯示,最小90%的工作消耗的資源少於6%,試圖改進的交互式作業都屬於這類小型工作。因此,我們可以通過使用少量額外資源來改善它們。第二個挑戰是額外克隆由於創建中間數據而產生的潛在問題,這可能會影響工作表現。有效的克隆需要我們克隆每個任務並使用首先完成的任務克隆的輸出。但是,這可能會導致在作業的不同階段(例如map, reduce, join)任務之間爭用中間數據;框架通常以下遊階段的任務(例如reduce)讀取上遊階段的任務(例如map)的輸出拓撲來組合作業。如果所有下遊克隆從首先完成的上遊克隆中讀取,則它們將爭用IO帶寬。避免這種爭用的替代方案是使每個下遊克隆僅從單個上遊克隆中讀取。但是這使下遊克隆的開始時間滯後。對爭用問題的解決方案是延遲分配。它基於直覺,除了少數落伍任務之外,大多數克隆幾乎同時完成。使用成本效益分析來克服這些克隆之間的這種差異,它會檢查克隆是否可以在將下遊克隆分配給上遊輸出的可用副本之前獲得獨占副本。成本優勢擬合分析是通用的,以說明階段之間的不同通信模式。因此構建了Dolly,這是一個在資源預算內運行時執行克隆以減輕落後任務影響的系統。使用來自Facebook和Bing的生產工作負載對150個節點群集進行評估表明,以LATE 和Mantri為基準,Dolly分別將小型作業的平均完成時間提高了34%至46%。通過挑選每個任務的最快克隆,Dolly有效地將最慢的任務從運行速度平均降低8倍提高到降低1.06倍,從而有效地消除任務延遲[25]。

3.7 Reining in the Outliers in Map-Reduce Clusters using Mantri

Mantri,這是一個監視任務並根據其原因篩選異常值的系統。它使用以下技術:(1)重新啟動觀測到的資源約束和工作不平衡的異常任務,(2)基於網絡感知的任務配置,(3)基於成本效益分析任務的輸出。Mantri采用的詳細分析和決策流程與Map-Reduce實施中異常值緩解的技術不同,前者不僅僅關註重復任務。MapReduce不能防止數據丟失導致的重新計算或者網絡擁塞導致的異常值。而Mantri根據數據源的位置以及當前網絡連接的使用情況來安排任務,在任務完成時,Mantri會復制其輸出。此外,Mantri執行異常值的智能重啟。長時間運行的任務,因為它有更多的工作要做,不會重新啟動;如果由於通過低帶寬網路讀取數據而滯後,則只有存在更有利的網絡位置可用時才會重新啟動任務。與當前只在某個階段結束時復制任務的方法不同,Mantri使用實時進度報告以決定任務的復制[26]。

3.8 Rock You like a Hurricane: Taming Skew in Large Scale Analytics

Hurricane是一個與Hadoop或Spark類似的大數據計算框架,創新之處在於,它基於節點在運行時觀察到的負載,自適應工作分區。超載節點在執行期間可以隨時產生任務的克隆,每個任務克隆處理一個原始數據的子集。該系統核心思想是任務克隆,其中在空閑節點上,高負荷節點可以克隆其任務,並讓每個克隆處理原始輸入的子集。這使得Hurricane能夠自適應地調整任務的並行,基於節點動態負載平衡[27]。對於表現不佳的任務在其執行期間可能會分裂到多個空閑節點中運行,這可以分擔一部分任務負載。相比於Hadoop和Spark等大數據處理框架能夠更好的處理數據偏斜。Hurricane通過聯合兩個新穎的手段實現任務克隆:細粒度的獨立數據訪問和支持合並的編程模型,這些技術可以幫助程序員寫出高性能,抗偏斜的應用。

4 Categorize and Compare

Hurricane是第一個基於任務執行期間worker觀察到的負載自適應分配工作的集群計算框架。通過多個worker之間細粒度的數據共享和程序員定義的合並程序,可以比較容易的解決數據偏斜帶來的問題。
相比於Dolly或者Mantri,已經提出了幾種技術將分析作業分成更小的任務,以便減輕傾斜並改善負載平衡。這些技術不但需要程序員的手動幹預,而且是特定於應用程序和輸入。例如,他們需要微調程序員定義的分割函數,利用交換性和關聯性來組合相同的鍵值,或跨多個分區分割相同鍵值的記錄。Hurricane通過在克隆任務時動態分割分區,以獨立於應用程序的方式緩解偏斜。可以表明,該方法可以有效地緩解偏斜,而無需調整應用程序,並且適用於任意操作,例如查找唯一值。
傳統的集群計算框架如Hadoop或者Spark,將數據分割成分區,並使用混洗和排序將它們合並回應用。這通常是以排序中間輸出為代價的,並且需要防止將具有相同key的記錄發送到多個reducer,這可能導致存在偏斜時的負載不平衡。更重要的是,這種方法限制了分區的大小,使得以平衡的方式重新分配分區變得更加困難。Hurricane通過賦予應用程序適時使用開發人員提供的自定義合並方法,以采取不同的方法。這種合並包含傳統的混洗和排序,但同時更靈活,因為它允許在執行過程中的任何節點創建的克隆的輸出以特定於應用程序的方式進行合並。盡管向現有框架添加合並過程相對簡單,但充分利用該過程需要重新設計和更改執行模型,以便可以即時重新分配任務,容錯機制也需要進行調整以解決可能存在多個輸出子集的問題。
SkewTune。SkewTune通過識別慢速任務並將其重新分區以在空閑節點上運行,來緩解MapReduce程序中的偏差。由於該系統旨在成為MapReduce的直接替代品,因此它受到類似的限制,即必須保留輸出順序並將數據放置在worker本地。雖然這種方法可以幫助緩解數據偏斜,但也會導致重大的數據移動,這可能會使已經超負荷的節點情況更加糟糕。SkewTune還可能會通過對即將完成的任務進行重新分區而無意中惡化了性能[27]。
Camdoop在MapReduce應用程序的混洗階段執行網絡內數據聚合,這可以通過減少移動的數據量和網絡上的整體負載來幫助緩解數據偏斜。不幸的是,這個解決方案需要目前不可用的特殊硬件。
對落伍節點(Straggler)的任務分析工作負載是一項挑戰。一種常用的處理落伍節點的方法是推測性執行,其中包括盡快偵測落伍任務,並在另一臺機器上重新啟動任務副本[10]。雖然這種方法有助於解決機器偏斜問題,但它不能解決數據或計算歪斜問題。Hurricane讓較慢的worker通過克隆來分配任務,避免了從頭開始重新開始任務的需要。

5 Summery

現實生活中,數據偏斜是十分普遍的,此外任務運行時間還可能受到機器偏差的影響,例如不同性能的機器,故障機器等,而數據偏斜對大數據執行框架的執行時間影響顯著。一般消除偏斜影響的技術大致有:推測性執行,黑名單機制以及近些年出現的自適用負載均衡。也出現大量為了解決這類問題而特殊設計的大數據計算框架,比如SkewTune,Camdoop,Spongefiles,Sparrow 和Hawk。而Hurricane通過細粒度的數據共享以及自主的任務克隆,可以有效減少偏斜帶來的任務延時,並且使用簡單,可以幫助用戶開發出高性能,抗偏斜的應用。

6 References

[1] Guojie L, Xueqi C. 大數據研究: 未來科技及經濟社會發展的重大戰略領域——大數據的研究現狀與科學思考[J]. 中國科學院院刊, 2012 (2012 年 06): 647-657.
[2] 鄔賀銓. 大數據時代的機遇與挑戰[J]. 唯實: 現代管理, 2013 (5): 33-34.
[3] 陳全, 鄧倩妮. 雲計算及其關鍵技術[J]. 計算機應用, 2009, 29(9): 2562-2567.
[4] Zikopoulos P, Eaton C. Understanding big data: Analytics for enterprise class hadoop and streaming data[M]. McGraw-Hill Osborne Media, 2011.
[5] Dean J, Ghemawat S. MapReduce: simplified data processing on large clusters[J]. Communications of the ACM, 2008, 51(1): 107-113.
[6] Shvachko K, Kuang H, Radia S, et al. The hadoop distributed file system[C]//Mass storage systems and technologies (MSST), 2010 IEEE 26th symposium on. Ieee, 2010: 1-10.
[7] Gasbarro J A, Horowitz M A, Barth R M, et al. Method and circuitry for minimizing clock-data skew in a bus system: U.S. Patent 5,432,823[P]. 1995-7-11.
[8] McCarthy P. Camus: The Stranger[M]. Cambridge University Press, 2004.
[9] Garduno E, Kavulya S, Tan J, et al. Theia: Visual Signatures for Problem Diagnosis in Large Hadoop Clusters[C]//LISA. 2012: 33-42.
[10] Chen Q, Liu C, Xiao Z. Improving MapReduce performance using smart speculative execution strategy[J]. IEEE Transactions on Computers, 2014, 63(4): 954-967.
[11] 王忠. 美國推動大數據技術發展的戰略價值及啟示[J]. 中國發展觀察, 2012 (6): 44-45.
[12] 李伯虎, 張霖, 任磊, 等. 再論雲制造[J]. 計算機集成制造系統, 2011, 17(3): 449-457.
[13] 孟小峰, 周龍驤, 王珊. 數據庫技術發展趨勢[J]. 軟件學報, 2004, 15(12):1822-1836.
[14] Tony Hey, Stewart Tansley, Kristin Tolle. 第四範式[M]. 科學出版社, 2012.
[15] 孟小峰, 慈祥. 大數據管理:概念、技術與挑戰[J]. 計算機研究與發展, 2013, 50(1):146-169.
[16] 馮登國, 張敏, 李昊. 大數據安全與隱私保護[J]. 計算機學報, 2014, 37(1):246-258.
[17] Ghemawat S, Gobioff H, Leung S T. The Google file system[M]. ACM, 2003.
[18] Zaharia M, Xin R S, Wendell P, et al. Apache spark: a unified engine for big data processing[J]. Communications of the ACM, 2016, 59(11): 56-65.
[19] Armbrust M, Xin R S, Lian C, et al. Spark sql: Relational data processing in spark[C]//Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data. ACM, 2015: 1383-1394.
[20] Meng X, Bradley J, Yavuz B, et al. Mllib: Machine learning in apache spark[J]. The Journal of Machine Learning Research, 2016, 17(1): 1235-1241.
[21] Xin R S, Gonzalez J E, Franklin M J, et al. Graphx: A resilient distributed graph system on spark[C]//First International Workshop on Graph Data Management Experiences and Systems. ACM, 2013: 2.
[22] Ousterhout K, Rasti R, Ratnasamy S, et al. Making Sense of Performance in Data Analytics Frameworks[C]//NSDI. 2015, 15: 293-307.
[23] Kwon Y C, Balazinska M, Howe B, et al. Skewtune in action: Mitigating skew in mapreduce applications[J]. Proceedings of the VLDB Endowment, 2012, 5(12): 1934-1937.
[24] Costa P, Donnelly A, Rowstron A, et al. Camdoop: Exploiting in-network aggregation for big data applications[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. USENIX Association, 2012: 3-3.
[25] Ananthanarayanan G, Ghodsi A, Shenker S, et al. Effective Straggler Mitigation: Attack of the Clones[C]//NSDI. 2013, 13: 185-198.
[26] Ananthanarayanan G, Kandula S, Greenberg A G, et al. Reining in the Outliers in Map-Reduce Clusters using Mantri[C]//OSDI. 2010, 10(1): 24.
[27] Bindschaedler L, Malicevic J, Schiper N, et al. Rock you like a hurricane: taming skew in large scale analytics[R]. 2018.

Survey Report on Data Skew in Big Data