1. 程式人生 > >Introducing DataFrames in Apache Spark for Large Scale Data Science(中英雙語)

Introducing DataFrames in Apache Spark for Large Scale Data Science(中英雙語)

文章標題

Introducing DataFrames in Apache Spark for Large Scale Data Science

一個用於大規模資料科學的API——DataFrame

作者介紹

文章正文

Today, we are excited to announce a new DataFrame API designed to make big data processing even easier for a wider audience.

今天,我們正式宣佈Spark新的API——DataFrame 。作為2014–2015年Spark最大的API改動,DataFrame能夠使得大資料更為簡單,從而擁有更廣泛的受眾群體。

When we first open sourced Apache Spark, we aimed to provide a simple API for distributed data processing in general-purpose programming languages (Java, Python, Scala). Spark enabled distributed data processing through functional transformations on distributed collections of data (RDDs). This was an incredibly powerful API: tasks that used to take thousands of lines of code to express could be reduced to dozens

.

我們最早在設計Spark的時候,其中一個很重要的目標就是給大資料生態圈提供基於通用程式語言的(Java、Scala、Python)簡單易用的API。Spark原本的RDD API通過函數語言程式設計的模式把分散式資料處理轉換成分散式資料集(distributed collections)。原本需要上千行用Hadoop MapReduce實現的程式碼,在Spark這個API上減少到了數十行。

  • dozens ['dʌznz] 幾十,許多;(一)打,十二個( dozen的名詞複數 )

As Spark continues to grow, we want to enable wider audiences beyond “Big Data” engineers to leverage

the power of distributed processing. The new DataFrames API was created with this goal in mind.  This API is inspired by data frames in R and Python (Pandas), but designed from the ground-up to support modern big data and data science applications. As an extension to the existing RDD API, DataFrames feature:

然後隨著Spark的不斷壯大,我們希望擁有更廣泛的受眾群體利用其進行分散式處理,不侷限於“大資料”工程師。這個新的DataFrame API在R和Python data frame的設計靈感之上,專門為了資料科學應用設計,具有以下功能特性:

  • leverage  [ˈli:vərɪdʒ] 槓桿作用;影響力;優勢,力量;舉債經營
  • ground-up 碾碎的;磨成粉的 重新
  • Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
  • Support for a wide array of data formats and storage systems
  • State-of-the-art optimization and code generation through the Spark SQL Catalystoptimizer
  • Seamless integration with all big data tooling and infrastructure via Spark
  • APIs for Python, Java, Scala, and R (in development via SparkR)
  • 從KB到PB級的資料量支援;
  • 多種資料格式和多種儲存系統支援;
  • 通過Spark SQL的Catalyst優化器進行先進的優化,生成程式碼;
  • 通過Spark無縫整合所有大資料工具與基礎設施;
  • 為Python、Java、Scala和R語言(SparkR)API。

For new users familiar with data frames in other programming languages, this API should make them feel at home. For existing Spark users, this extended API will make Spark easier to program, and at the same time improve performance through intelligent optimizations and code-generation.

對於之前熟悉其他語言中data frames的新使用者來說,這個新的API可以讓Spark的初體驗變得更加友好。而對於那些已經在使用的使用者來說,這個API會讓基於Spark的程式設計更加容易,同時其智慧優化和程式碼生成也能幫助使用者獲得更好的效能。

1、What Are DataFrames?

In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

在Spark中,DataFrame是一個以命名列方式組織的分散式資料集,等同於關係型資料庫中的一個表,也相當於R/Python中的data frames(但是進行了更多的優化)。DataFrames可以由結構化資料檔案轉換而來,也可以從Hive中的表得來,以及可以轉換自外部資料庫或現有的RDD。

  • conceptually [kən'septʃʊrlɪ] 概念地 從概念上講
  • equivalent  [ɪˈkwɪvələnt] 相等的,相當的,等效的;等價的,等積的;[化學]當量的

The following example shows how to construct DataFrames in Python. A similar API is available in Scala and Java.

下面程式碼演示瞭如何使用Python構造DataFrames,而在Scala和Java中也有類似的API可以呼叫。

// Constructs a DataFrame from the users table in Hive.
users = context.table(“users”)

// from JSON files in S3
logs = context.load(“s3n://path/to/data.json”, “json”)

2、How Can One Use DataFrames?

Once built, DataFrames provide a domain-specific language for distributed data manipulation.  Here is an example of using DataFrames to manipulate the demographic data of a large population of users:

一經構建,DataFrames就會為分散式資料處理提供一個指定的DSL(domain-specific language )。

  • demographic  [ˌdemə'ɡræfɪk] 人口統計學的;人口統計的
// Create a new DataFrame that contains “young users” only
young = users.filter(users.age < 21)

// Alternatively, using Pandas-like syntax
young = users[users.age < 21]

// Increment everybody’s age by 1
young.select(young.name, young.age + 1)

// Count the number of young users by gender
young.groupBy(“gender”).count()

// Join young users with another DataFrame called logs
young.join(logs, logs.userId == users.userId, “left_outer”)

You can also incorporate SQL while working with DataFrames, using Spark SQL. This example counts the number of users in the young DataFrame.

通過Spark SQL,你還可以用SQL的方式操作DataFrames。下面這個例子統計了“young” DataFrame中的使用者數量。

young.registerTempTable(“young”)

context.sql(“SELECT count(*) FROM young”)

In Python, you can also convert freely between Pandas DataFrame and Spark DataFrame:

在Python中,Pandas DataFrame和Spark DataFrame還可以自由轉換。

// Convert Spark DataFrame to Pandas
pandas_df = young.toPandas()

// Create a Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)

Similar to RDDs, DataFrames are evaluated lazily. That is to say, computation only happens when an action (e.g. display result, save output) is required. This allows their executions to be optimized, by applying techniques such as predicate push-downs and bytecode generation, as explained later in the section “Under the Hood: Intelligent Optimization and Code Generation”. All DataFrame operations are also automatically parallelized and distributed on clusters.

類似於RDD,DataFrame同樣使用了lazy的方式。也就是說,只有動作真正發生時(如顯示結果,儲存輸出),計算才會進行。從而,通過一些技術,比如predicate push-downs和bytecode generation,執行過程可以進行適當的優化(詳情見下文)。同時,所有的DataFrames也會自動的在叢集上並行和分佈執行。

3、Supported Data Formats and Sources

Modern applications often need to collect and analyze data from a variety of sources. Out of the box, DataFrame supports reading data from the most popular formats, including JSON files, Parquet files, Hive tables. It can read from local file systems, distributed file systems (HDFS), cloud storage (S3), and external relational database systems via JDBC. In addition, through Spark SQL’s external data sources API, DataFrames can be extended to support any third-party data formats or sources. Existing third-party extensions already include Avro, CSV, ElasticSearch, and Cassandra.

現代的應用程式通常需要收集和分析來自各種不同資料來源的資料,而DataFrame與生俱來就支援讀取最流行的格式,包括JSON檔案、Parquet檔案和Hive表格。DataFrame還支援從多種型別的檔案系統中讀取,比如本地檔案系統、分散式檔案系統(HDFS)以及雲端儲存(S3)。同時,配合JDBC,它還可以讀取外部關係型資料庫系統。此外,通過Spark SQL的外部資料來源(external data sources) API,DataFrames可以更廣泛地支援任何第三方資料格式和資料來源。值得一提的是,當下的第三方擴充套件已經包含Avro、CSV、ElasticSearch和Cassandra。

  • out of the box [aʊt ʌv ði bɑks]  (澳/紐西蘭,非正式)非常好

DataFrames’ support for data sources enables applications to easily combine data from disparate sources (known as federated query processing in database systems). For example, the following code snippet joins a site’s textual traffic log stored in S3 with a PostgreSQL database to count the number of times each user has visited the site.

DataFrames對資料來源的支援能力允許應用程式可以輕鬆地組合來自不同資料來源的資料。下面的程式碼片段則展示了儲存在S3上網站的一個文字流量日誌(textual traffic log)與一個PostgreSQL資料庫的join操作,目的是計算網站使用者訪問該網站的次數。

  • disparate ['dɪspərət] 完全不同的;從根本上種類有區分或不同的
users = context.jdbc(“jdbc:postgresql:production”, “users”)

logs = context.load(“/path/to/traffic.log”)

logs.join(users, logs.userId == users.userId, “left_outer”) \

.groupBy(“userId”).agg({“*”: “count”})

4、Application: Advanced Analytics and Machine Learning

Data scientists are employing increasingly sophisticated techniques that go beyond joins and aggregations. To support this, DataFrames can be used directly in MLlib’s machine learning pipeline API. In addition, programs can run arbitrarily complex user functions on DataFrames.

當下,資料科學家們使用的技術已日益複雜,超越了joins和aggregations。為了更好地支援他們的使用,DateFrames可以直接在MLlib的machine learning pipeline API中使用。此外,在DataFrames中,程式還可以執行任意複雜的使用者函式。

  • sophisticated [səˈfɪstɪˌketɪd]  複雜的;精緻的;富有經驗的;深奧微妙的
  • arbitrarily  [ˌɑrbəˈtrɛrəlɪ] 任意地;武斷地;反覆無常地;肆意地

Most common advanced analytics tasks can be specified using the new pipeline API in MLlib. For example, the following code creates a simple text classification pipeline consisting of a tokenizer, a hashing term frequency feature extractor, and logistic regression.

通過Spark,使用者可以使用MLlib中新的pipelineAPI來指定高階分析任務。例如,下面的程式碼建立了一個簡單的文字分類(text classification)管道。該管道由一個tokenizer,一個hashing term frequency feature extractor和logistic regression組成。

tokenizer = Tokenizer(inputCol=”text”, outputCol=”words”)

hashingTF = HashingTF(inputCol=”words”, outputCol=”features”)

lr = LogisticRegression(maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

Once the pipeline is setup, we can use it to train on a DataFrame directly:

一旦管道設定好,我們可以直接使用它在DataFrame上進行訓練。

df = context.load(“/path/to/data”)

model = pipeline.fit(df)

For more complicated tasks beyond what the machine learning pipeline API provides, applications can also apply arbitrarily complex functions on a DataFrame, which can also be manipulated using Spark’s existing RDD API. The following snippet performs a word count, the “hello world” of big data, on the “bio” column of a DataFrame.

對於那些複雜程度超出了machine learning pipeline API能力的任務,應用程式也可以通過DataFrames提供任意複雜的函式,當然這也可以通過Spark已有的RDD API來實現。下面程式碼段實現的是一個DataFrame“bio”列上的word count(大資料時代的Hello World)。

  • manipulate  [məˈnɪpjəˌlet] 操縱;操作,處理;巧妙地控制;[醫] 推拿,調整
  • snippet  [ˈsnɪpɪt] 小片,片段;不知天高地厚的年輕人
df = context.load(“/path/to/people.json”)
// RDD-style methods such as map, flatMap are available on DataFrames
// Split the bio text into multiple words.
words = df.select(“bio”).flatMap(lambda row: row.bio.split(” “))
// Create a new DataFrame to count the number of words words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF() word_counts = words_df.groupBy(“word”).sum()

5、Under the Hood: Intelligent Optimization and Code Generation

Unlike the eagerly evaluated data frames in R and Python, DataFrames in Spark have their execution automatically optimized by a query optimizer. Before any computation on a DataFrame starts, the Catalyst optimizer compiles the operations that were used to build the DataFrame into a physical plan for execution. Because the optimizer understands the semantics of operations and structure of the data, it can make intelligent decisions to speed up computation.

與R/Python中data frame使用的eager方式不同,Spark中的DataFrames執行會被查詢優化器自動優化。在DataFrame上的計算開始之前,Catalyst優化器會編譯操作,這將把DataFrame構建成物理計劃來執行。因為優化器清楚操作的語義和資料的結構,所以它可以為計算加速制定智慧的決策。

  • eagerly [ˈiɡɚlɪ] 渴望地,熱切地

At a high level, there are two kinds of optimizations. First, Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data. In the case of Parquet files, entire blocks can be skipped and comparisons on strings can be turned into cheaper integer comparisons via dictionary encoding. In the case of relational databases, predicates are pushed down into the external databases to reduce the amount of data traffic.

在高等級上,這裡存在兩種型別的優化。首先,Catalyst提供了邏輯優化,比如謂詞下推(predicate pushdown)。優化器可以將謂詞過濾下推到資料來源,從而使物理執行跳過無關資料。在使用Parquet的情況下,更可能存在檔案被整塊跳過的情況,同時系統還通過字典編碼把字串對比轉換為開銷更小的整數對比。在關係型資料庫中,謂詞則被下推到外部資料庫用以減少資料傳輸。

  • irrelevant  [ɪˈrɛləvənt] 不相干的;不恰當;缺乏時代性的,落後於潮流的;牛頭不對馬嘴
  • traffic [ˈtræfɪk] 交通,運輸量;(非法的)交易;通訊量;交際 傳輸

Second, Catalyst compiles operations into physical plans for execution and generates JVM bytecode for those plans that is often more optimized than hand-written code. For example, it can choose intelligently between broadcast joins and shuffle joins to reduce network traffic. It can also perform lower level optimizations such as eliminating expensive object allocations and reducing virtual function calls. As a result, we expect performance improvements for existing Spark programs when they migrate to DataFrames.

第二,為了更好地執行,Catalyst將操作編譯為物理計劃,並生成JVM bytecode,這些通常會比人工編碼更加優化。例如,它可以智慧地選擇broadcast joins和shuffle joins來減少網路傳輸。其次,同樣存在一些較為低階的優化,如消除代價昂貴的物件分配及減少虛擬函式呼叫。因此,我們認為現有的Spark專案遷移到DataFrames後,效能會有所改觀。

  • hand-written code 人工編碼 手寫程式碼
  • network traffic 網路傳輸
  • perform [pərˈfɔ:rm] 執行;履行;表演;扮演

Since the optimizer generates JVM bytecode for execution, Python users will experience the same high performance as Scala and Java users.

同時,鑑於優化器為執行生成了JVM bytecode,Python使用者將擁有與Scala和Java使用者一樣的高效能體驗。

The above chart compares the runtime performance of running group-by-aggregation on 10 million integer pairs on a single machine (source code). Since both Scala and Python DataFrame operations are compiled into JVM bytecode for execution, there is little difference between the two languages, and both outperform the vanilla Python RDD variant by a factor of 5 and Scala RDD variant by a factor of 2.

上圖是在單個機器上對1000萬個整數進行分組聚合(group-by-aggregation)的執行時效能對比。在綠色部分,為了更好地執行,Scala和Python的DataFrame操作都被編譯成了JVM bytecode,導致這兩種語言在效能上基本有著同樣的表現。同時,兩者效能均優於普通Python RDD實現的4倍,也達到了Scala RDD實現的兩倍。

DataFrames were inspired by previous distributed data frame efforts, including Adatao’s DDF and Ayasdi’s BigDF. However, the main difference from these projects is that DataFrames go through the Catalyst optimizer, enabling optimized execution similar to that of Spark SQL queries. As we improve the Catalyst optimizer, the engine also becomes smarter, making applications faster with each new release of Spark.

不管選擇了哪種語言,Catalyst優化器都實現了DataFrame程式的優化執行。同時,隨著Catalyst優化器的不斷改善,引擎也會變得更智慧,從而對比已有版本,Spark的每一個新版本都會有效能上的提升。

Our data science team at Databricks has been using this new DataFrame API on our internal data pipelines. It has brought performance improvements to our Spark programs while making them more concise and easier to understand. We are very excited about it and believe it will make big data processing more accessible to a wider array of users.

在Databricks,資料科學家團隊已經將DataFrame API搭載在內部的資料管道上。Spark程式效能的改進已經在我們內部得到證實,而程式也更加的簡潔易懂。毫無疑問,這將大幅度地降低大資料使用門檻,讓大資料技術為更多人使用。

  • concise  [kənˈsaɪs]  簡約;簡明的,簡潔的;精煉

This API will be released as part of Spark 1.3 in early March. If you can’t wait, check out Spark from GitHub to try it out. If you are in the Bay Area at the Strata conference, please join us on Feb 17 in San Jose for a meetup on this topic.

這個API將在3月初作為Spark1.3版本的一部分發布。如果你迫不及待,可以關注Spark在GitHub上的進展。如果你在加州灣區參加Strata conference,2月17日聖何塞有一個關於這個主題的Meetup。

  • bay [be] 灣,海灣;犬吠聲;月桂樹;吊窗,凸窗
  • San Jose [sɑnhoˈze] 聖何塞(美國城市)

This effort would not have been possible without the prior data frame implementations, and thus we would like to thank the developers of R, Pandas, DDF and BigDF for their work.

參考文獻

  • https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
  • https://blog.csdn.net/mt0803/article/details/50464124
  • https://blog.csdn.net/yhao2014/article/details/44979041

相關推薦

Introducing DataFrames in Apache Spark for Large Scale Data Science雙語

文章標題 Introducing DataFrames in Apache Spark for Large Scale Data Science 一個用於大規模資料科學的API——DataFrame 作者介紹 文章正文 Today, we are excited to announce a ne

What’s new for Spark SQL in Apache Spark 1.3雙語

block htm park -h apache HA log -a -- 文章標題 What’s new for Spark SQL in Apache Spark 1.3 作者介紹 Michael Armbrust 文章正文 參考文獻

Introducing Apache Spark Datasets雙語

文章標題 Introducing Apache Spark Datasets 作者介紹 文章正文 Developers have always loved Apache Spark for providing APIs that are simple yet powerful, a combi

A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets雙語

文章標題 A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets 且談Apache Spark的API三劍客:RDD、DataFrame和Dataset When to use them and why 什麼時候用他們,為什麼

Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop雙語

文章標題 Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop Deep dive into the new Tungsten execution engine 作者介紹 文章正文 參考文獻

Deep Dive into Spark SQL’s Catalyst Optimizer雙語

文章標題 Deep Dive into Spark SQL’s Catalyst Optimizer 作者介紹 文章正文 參考文獻 https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-op

Spark 論文篇-Spark:工作組上的叢集計算的框架雙語

論文內容: 待整理 參考文獻: Spark: Cluster Computing with Working Sets. Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica. H

VERY DEEP CONVOLUTIONAL NETWORKS FOR LARGE-SCALE IMAGE RECOGNTION翻譯

而不是 lar 標準 類別 研究 src 架構 數量 分辨率 0 - ABSTRACT   在這個工作中,我們研究了卷積網絡的深度對於它在大規模圖像識別設置上的準確率的效果。我們的主要貢獻是對使用非常小的卷積核(3×3)來增加深度的網絡架構進行徹底評估,這說明了通過將深度增

論文閱讀筆記四十一:Very Deep Convolutional Networks For Large-Scale Image RecongnitionVGG ICLR2015

結合 等價 選擇 mac 不同的 works info 內存 enc 論文原址:https://arxiv.org/abs/1409.1556 代碼原址:https://github.com/machrisaa/tensorflow-vgg 摘要 本

AFLW:Annotated Facial Landmarks in the Wild: A large-scale, real-world database for facial landmark

簡單翻譯了一下AFLW的論文(解釋說明書)。 AFLW是一個人臉庫,一共有25993張人臉影象,它最突出的特點是在人臉關鍵點上定位了21個點,更容易被檢測。其次圖片質量比較高,不僅僅是室內,還有室外,側臉等難於檢測的情況都涵蓋在它的人臉庫中。 AFLW提供alw.sqlite,資料

VGGnet論文總結VERY DEEP CONVOLUTIONAL NETWORKS FOR LARGE-SCALE IMAGE RECOGNITION

lrn cli 共享 融合 loss sca 得到 同時 works VGGNet的主要貢獻:   1、增加了網絡結構的深度   2、使用了更小的filter(3*3) 1 introduction 這部分主要說明了,由於在所有的卷積網絡上使用了3*3的filter,所以使

Deep Mixture of Diverse Experts for Large-Scale Visual Recognition 閱讀及相關疑問

1. 背景 大規模視覺識別有三大方向:1)對網路結構改造,加深網路,增加每層網路的神經元數量。 2)做遷移學習:例如學習到的1000類分類器用在500類(大用在小)。 3)多個CNN結合:多個1000類分類器來識別10000類(小用在大)。——本文的方向 Deep Mixture :深度混合

Fully-Convolutional Point Networks for Large-Scale Point Clouds

Abstract        這項工作提出了一種通用的,完全卷積的網路架構,用於有效處理大規模3D資料。 我們的方法的一個顯著特點是它能夠處理無組織的3D表示,例如點雲作為輸入,然後將它們內部轉換為有序結構,以便能用3D卷積來進行處理。與從輸入到輸出維持無組織或有組織表示

【論文閱讀】Deep Mixture of Diverse Experts for Large-Scale Visual Recognition

導讀:   本文為論文《Deep Mixture of Diverse Experts for Large-Scale Visual Recognition》的閱讀總結。目的是做大規模影象分類(>1000類),方法是混合多個小深度網路實現更多類的分類。本文從以下五個方面來對論文做個簡要整理:   背

VGG學習筆記-Very Deep Convolutional Networks for Large-Scale Image Recognition

主要是針對論文,進行了自我解讀,抽絲而成,請大家多多指教。              摘要        在這項工作中,主要研究卷積網路Convolutional networks (ConvNets)深度在大規模的影象識別環境下對準確性的影響。主要貢獻是使用非常小的

論文筆記《Very Deep Convolutional Networks for Large-Scale Image Recognition》

VGGNet在2014年的ILSVRC競賽上,獲得了top-1 error的冠軍和top-5 error的第二名,錯誤率分別為24.7%和7.3%,top-5 error的第一名是GoogLeNet 6.7%。在圖片定位任務中,也獲得了冠軍。網路層數由之前的AlexNet 的8層提高到了最高19

#Apache Spark系列技術直播# 第六講【 What's New in Apache Spark 2.4? 】

Apache Spark系列技術直播第六講 【 What's New in Apache Spark 2.4? 】 Abstract(簡介): This talk will provide an overview of the major features and enhancements in Spar

Learning Hand-Eye Coordination for Robotic Grasping with Deep Learning and Large-Scale Data Collection

We describe a learning-based approach to hand-eye coordination for robotic grasping from monocular images. To learn hand-eye coordination fo

深度學習論文隨記---VGGNet模型解讀-2014年Very Deep Convolutional Networks for Large-Scale Image Recognition

深度學習論文隨記(二)---VGGNet模型解讀 Very Deep Convolutional Networks forLarge-Scale Image Recognition Author: K Simonyan , A Zisserman Year: 2014

人臉識別:coco loss-Rethinking Feature Discrimination and Polymerization for Large-scale Recognition

       nips的一篇做分類和識別的工作,其中在人臉識別任務上也做了實驗,Rethinking Feature Discrimination and Polymerization for Large-scale Recognition.Yu Liu, Hongyang