sparklyr包--實現R與Spark接口

分類:IT技術 時間:2016-10-12

1.sparklyr包簡介

  Rstudio公司發布的sparklyr包具有以下幾個功能:

  • 實現R與Spark的連接;
  • sparklyr包提供了一個完整的dplyr後端,可篩選並聚合Spark數據集,接著在R中實現分析與可視化;
  • 利用Spark的MLlib機器學習庫在R中實現分布式機器學習算法;
  • 可以創建一個擴展,用於調用Spark API,並為Spark的所有包集提供了一個接口。

2.RStudio Server安裝sparklyr包

  linux版本:ubuntu 16.04 LTS 64bit

  R版本:R3.3.1 64bit

  RStudio Server版本:rstudio-server-0.99.902 64bit

  通過devtools包實現sparklyr包的安裝:

install.packages("devtools")
devtools::install_github("rstudio/sparklyr")

註意:此處安裝devtools時Ubuntu中可能會出現安裝不上的錯誤:

sparklyr包--實現R與Spark接口

看錯誤信息可以知道找不到openssl,需要安裝libssl-dev(Ubuntu):

$ sudo apt-get install libssl-dev

sparklyr包--實現R與Spark接口

然後安裝sparklyr因為網速等的原因可能需要進行多次安裝,多嘗試幾次就可以了。如果安裝中斷,很可能安裝包被lock,可以使用下面的方案解決(以reader包為例):

sparklyr包--實現R與Spark接口

install.packages(“readr”,dependencies=TRUE,INSTALL_opts = c(‘—no-lock’))

3.在本地安裝Spark 1.6.1和Hadoop 2.6

library(sparklyr)
spark_install(version = "1.6.1")

sparklyr包--實現R與Spark接口  

此處默認的是使用Spark 1.6.1和Hadoop 2.6

如果用的是RStudio IDE,還需下載最新的預覽版IDE。它包含有實現與Spark交互的若幹增強功能(詳情參考RStudio IDE)。

https://www.rstudio.com/products/rstudio/download/preview/

http://spark.rstudio.com/index.html#rstudio-ide

4.部署Spark

4.1本地部署

安裝好sparklyr包後,我們連接本地的Spark,也可以連接遠程的Spark集群。這裏,使用spark_connect函數來連接本地的Spark:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")

返回的Spark連接(sc)為Spark集群提供了一個遠程的dplyr數據源。

4.2集群部署

使用sparklyr連接遠程Spark集群Cluster Deployment,需要直接將R session部署在集群節點中的一個machine或者靠近集群處(根據網絡性能)。在此種情況下,R不是直接在集群上運行,所以,必須要保證集群中的每個machine都有一個spark version並且具有相同的配置。

在集群節點中的一個machine或者靠近集群處運行R最直截了當的方式可以通過遠程SSH會話或Rstudio server。在集群節點中使用的spark version必須已經在節點中部署,並且spark的路徑需要添加到環境變量SPARK_HOME中,因此,在嘗試一個連接之前必須保證SPARK_HOME環境變量在server中正確定義。通常是在Renviron.site配置文件中完成的。示例:

SPARK_HOME=/opt/spark/spark-1.6.1-bin-hadoop2.6

然後,通過spark_connect函數和主節點的地址連接,例如:

library(sparklyr)
sc <- spark_connect(master = "spark://local:7077")

如果在EC2上使用Spark EC2部署腳本,可以從/root/spark-ec2/cluster-url讀取master,例如:

library(sparklyr)
cluster_url <- system('cat /root/spark-ec2/cluster-url', intern=TRUE)
sc <- spark_connect(master = cluster_url)

連接工具

可以通過spark-web函數觀看Spark web UI,通過spark_log函數觀看Spark log(日誌)

spark_web(sc)
spark_log(sc)

使用spark_disconnect函數斷開spark的連接:

spark_disconnect(sc)

5.配置configuration

本部分描述sparklyr包和潛在的spark集群的行為的配置的各個選項。同時介紹創建多個配置文件(比如開發、測試、生產)。

5.1配置文件Config Files

通過spark_connect函數的config參數可以指定Spark連接的配置。通過使用config=spark_config()創建默認的配置。下面的代碼代表了默認的行為:

spark_connect(master = "local", config = spark_config())

通過config=spark_config()函數可以從本地當前工作目錄(如果不是位於工作目在父目錄錄)的路徑中讀取配置文件config.yml中的數據。這個文件不是必需的,只是用來提供重寫默認行為overriding default behavior。還可以指定另一個配置文件名稱 和/或 位置。config.yml文件依次處理使用配置包(using the config package),配置包可以支持多命名配置文件。

5.2Package選項

有許多可用的選項配置sparklyr包的行為:

選項

描述

sparklyr.defaultPackages

自動包括在會話中的Spark packages (defaults to com.databricks:spark-csv_2.11:1.3.0” and “com.amazonaws:aws-Java-sdk-pom:1.10.34”)

sparklyr.cores.local

當在本地運行時使用的內核數量 (defaults to parallel::detectCores)

sparklyr.shell.*

傳遞給spark-shell的命令行參數 (see the Spark documentation for details on supported options)

舉個例子:下面的配置文件設置了本地內核數為4並分配給Spark驅動2G內存:

default:
sparklyr.cores.local: 4
sparklyr.shell.driver-memory: 4GB

註:多文件的default使用將在下面描述。

5.3Spark選項

可以使用config.yml指定任意的spark配置屬性:

選項

描述

spark.*

任意配置屬性 (通過創建一個SparkConf包含指定的屬性應用)。spark的配置文檔可以查看可用的屬性。http://spark.apache.org/docs/latest/configuration.html

sparklyr包--實現R與Spark接口 

spark.sql.*

Spark SQL的任意配置屬性 (applied using SET)。Spark SQL Programming Guide的配置文檔可以查看可用的屬性。http://spark.apache.org/docs/latest/sql-programming-guide.html

sparklyr包--實現R與Spark接口 

舉個例子:下面的配置文件為spark設置了一個當前的工作目錄,並指定當揉數據(joins or aggregations)時使用的分區數量為100。

default:
spark.local.dir: /tmp/spark-scratch
spark.sql.shuffle.partitions: 100

5.4多文件配置

config包允許為不同環境定義多命名配置文件(例如:default, test, production)。所有額環境默認繼承default環境,並且可以相互繼承。

舉個例子:您可能想使用一個不同的數據集來開發和測試或可能希望使用只適用於生產集群上運行的自定義Spark配置屬性。config.yml表示如下

default:
dataset: "observations-dev.parquet"
sample-size: 10000

production:
spark.memory.fraction: 0.9
spark.rdd.compress: true
dataset: "observations.parquet"
sample-size: null

還可以使用這個特點來為不同的環境指定不同的Spark master:

default:
spark.master: "local"

production:
spark.master: "spark://local:7077"

使用上面的配置,可以在使用spark_connect()的時候徹底省略master參數:

sc <- spark_connect()

註意:當前活動配置通過R_CONFIG_ACTIVE環境變量的值決定,可以通過config package documentation詳細的了解。https://github.com/rstudio/config

6.預覽版RStudio Server

Rstudio server提供了一個基於web的IDE遠程的R會話接口,使其spark集群可以供前端使用。本部分介紹一些對於RStudio Server非常有用的額外的配置選項。RStudio的最新預覽版集成支持Spark和sparklyr包。包含以下工具https://www.rstudio.com/products/rstudio/download/preview/:

  • 創建和管理Spark連接
  • 瀏覽表格數據和Spark DataFrames的所有列
  • 可以預覽Spark DataFrames的前1000行

6.1連接選項

一旦成功安裝完sparklyr包,我們可以在IDE中可以看到一個新的Spark窗口。該窗口包含一個New Connection對話框,用於連接本地或者遠程的Spark。如下所示:

 sparklyr包--實現R與Spark接口

sparklyr包--實現R與Spark接口

可以使用rstudio.spark.connections選項配置哪一個連接,默認的可能是local和cluster連接,可以選擇其中之一作為提供的連接,或者使用一個特殊的Spark master URL。一些常用的連接組合的選擇包括:

Value

描述

c("local", "cluster")

Default 提供了本地和cluster spark instance的連接

"local"

提供了本地spark instance連接

"cluster"

提供了cluster spark instance連接

"spark://local:7077"

提供了特殊cluster的連接

c("spark://local:7077", "cluster")

提供了特殊cluster和其他cluster的連接

這些選項應該在Rprofile.site中設置,例如:

options(rstudio.spark.connections = "spark://local:7077")

6.2Spark安裝

如果是在本地模式(相對於集群模式),需要預裝spark version(s)並共享給使用該服務器的所有使用者。你可以安裝spark version(s)在一個共享的目錄中(e.g. /opt/spark),然後標明它作為spark安裝目錄。

options(spark.install.dir = "/opt/spark")

7.Sparklyr包的使用

7.1連接spark

安裝好sparklyr包之後,我們連接本地的Spark,也可以連接遠程的Spark集群。這裏,我們使用spark_connect函數來連接本地的Spark:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")

返回的Spark連接(sc)為Spark集群提供了一個遠程的dplyr數據源。
出現下面的問題:

sparklyr包--實現R與Spark接口

要求在Ubuntu中安裝Java:

方法一:

Windows中下載sparklyr包--實現R與Spark接口

從Windows中復制到Ubuntu中:

sparklyr包--實現R與Spark接口

打開新的控制臺,創建目標文件夾:

root@love:/home/wangchao# cd /usr/lib
root@love:/usr/lib# sudo mkdir java

在原來的控制臺中輸入如下命令,安裝JDK:

$ sudo tar zvxf jdk-7u67-linux-x64.gz -C /usr/lib/java
# 該命令的意思是解壓jdk-7u67-linux-x64.gz文件,並把它安裝到/usr/lib/java目錄下,也就是前面創建的文件夾。註意命令中的-C是大寫的字母C。

打開系統配置文件.bashrc

$ sudo gedit .bashrc

更多詳情見請繼續閱讀下一頁的精彩內容

在其末端添加下面的代碼,註意不要修改其他任何代碼,添加環境變量:

export JAVA_HOME=/usr/lib/java/jdk1.7.0_67
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=${JAVA_HOME}/lib:${JRE_HOME}/lib:
export PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin:$PATH

sparklyr包--實現R與Spark接口

測試是否安裝成功:

sparklyr包--實現R與Spark接口

sparklyr包--實現R與Spark接口

解壓後的文件:

sparklyr包--實現R與Spark接口

方法二:

直接使用系統安裝:

$ sudo apt-get install openjdk-8-jdk
$ sudo R CMD javareconf
$ sudo R

以上安裝Java後還是出現了相同的問題:

sparklyr包--實現R與Spark接口

安裝好後發現還是不行,找不到Java,這下可以確定不是jdk的問題了,應該是R不能找到jdk,於是想到了一個打通R和Java的通道包rJava包,下載和安裝rJava包:

Install.packages(“rJava”)

這下解決了以上的問題。

7.2讀取數據

使用copy_to函數可以實現將R中的數據框導入到Spark。下面我將R自帶的iris數據集,nycflights13包的flights數據集,以及Lahman包的Batting數據集復制到Spark(請確保安裝了這兩個包)。

install.packages(“nycflights13”)
install.packages(“Lahman”)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

此處由於在安裝的時候出現了中斷,導致依賴包readr包被鎖(lock)如下所示:

sparklyr包--實現R與Spark接口

當然,解決這個問題,就是刪除已經安裝好的了readr包然後重新在rstudio server中安裝readr包,或者修改readr的訪問權限:

install.packages(“readr”)
or
使用root權限
cd <readr存在的文件夾>
$ sudo chmod -R 777 readr

使用dplyr的src_tbls函數可以列出所有可用的表(包括預先加載在集群內的表)。

src_tbls(sc)

[1] "batting" "flights" "iris" 

7.3使用dplyr語法

利用dplyr語法來對集群內的所有表進行操作,下面是一個簡單的數據篩選案例:

# 篩選出飛機晚點兩分鐘的航班信息
flights_tbl %>% filter(dep_delay == 2)
 
Source:   query [?? x 16]
database: spark connection master=local app=sparklyr local=TRUE

    year month   day dep_time dep_delay arr_time arr_delay carrier tailnum flight origin  dest
   <int> <int> <int>    <int>     <dbl>    <int>     <dbl>   <chr>   <chr>  <int>  <chr> <chr>
1   2013     1     1      517         2      830        11      UA  N14228   1545    EWR   IAH
2   2013     1     1      542         2      923        33      AA  N619AA   1141    JFK   MIA
3   2013     1     1      702         2     1058        44      B6  N779JB    671    JFK   LAX
4   2013     1     1      715         2      911        21      UA  N841UA    544    EWR   ORD
5   2013     1     1      752         2     1025        -4      UA  N511UA    477    LGA   DEN
6   2013     1     1      917         2     1206        -5      B6  N568JB     41    JFK   MCO
7   2013     1     1      932         2     1219        -6      VX  N641VA    251    JFK   LAS
8   2013     1     1     1028         2     1350        11      UA  N76508   1004    LGA   IAH
9   2013     1     1     1042         2     1325        -1      B6  N529JB     31    JFK   MCO
10  2013     1     1     1231         2     1523        -6      UA  N402UA    428    EWR   FLL
..   ...   ...   ...      ...       ...      ...       ...     ...     ...    ...    ...   ...
Variables not shown: air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>.
 

dplyr導論提供了許多dplyr包中函數的使用案例。以下案例演示的是航班延誤信息的數據可視化:

dplyr導論https://cran.rstudio.com/web/packages/dplyr/vignettes/introduction.html

delay <- flights_tbl %>%
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect
# 繪圖
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
 

sparklyr包--實現R與Spark接口

7.4窗口函數

支持dplyr的窗口函數。如下所示:

batting_tbl %>%
  select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
 
Source:   query [?? x 7]
Database: spark connection master=local app=sparklyr local=TRUE
Groups: playerID

    playerID yearID teamID     G    AB     R     H
       <chr>  <int>  <chr> <int> <int> <int> <int>
1  anderal01   1941    PIT    70   223    32    48
2  anderal01   1942    PIT    54   166    24    45
3  balesco01   2008    WAS    15    15     1     3
4  balesco01   2009    WAS     7     8     0     1
5  bandoch01   1986    CLE    92   254    28    68
6  bandoch01   1984    CLE    75   220    38    64
7  bedelho01   1962    ML1    58   138    15    27
8  bedelho01   1968    PHI     9     7     0     1
9  biittla01   1977    CHN   138   493    74   147
10 biittla01   1975    MON   121   346    34   109
..       ...    ...    ...   ...   ...   ...   ...

更多dplyr在Spark中的用法參考http://spark.rstudio.com/dplyr.html

7.5調用MLlib

利用sparklyr包中的MLlib函數可以實現在Spark集群中調用機器學習算法。 這裏,我們使用ml_linear_regression函數來擬合一個線性回歸模型。數據為內置的mtcars數據集,我們想看看能否通過汽車的重量(wt)和發動機的氣缸數(cyl)來預測汽車的油耗(mpg)。我們假設mpg跟這兩個變量之間的關系是線性的。

# 將mtcar數據集復制到spark
mtcars_tbl <- copy_to(sc, mtcars)

# 先對數據做變換,然後將數據集分割為訓練集和測試集
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# 對訓練數據集做模型擬合
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))

Call:
mpg ~ wt + cyl

Coefficients:
(Intercept) wt cyl
33.499452 -2.818463 -0.923187 
復制代碼

對spark得到的線性回歸模型,使用summary()函數可以查看模型的擬合效果以及每個預測指標的統計意義。

summary(fit)
 
Call:
mpg ~ wt + cyl

Residuals:
Min 1Q Median 3Q Max
-1.752 -1.134 -0.499 1.296 2.282
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 33.49945 3.62256 9.2475 0.0002485 ***
wt -2.81846 0.96619 -2.9171 0.0331257 *
cyl -0.92319 0.54639 -1.6896 0.1518998
---
Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

R-Squared: 0.8274
Root Mean Squared Error: 1.422 

參考:

Spark機器學習提供常用機器學習算法的實現和特征變換。更多信息請參考:

http://spark.rstudio.com/mllib.html

sparklyr包中的函數參考文檔:

http://spark.rstudio.com/reference/sparklyr/latest/index.html

創建sparklyr擴展:

http://spark.rstudio.com/extensions.html

官方網站:

http://spark.rstudio.com/index.html

本文永久更新鏈接地址


Tags:

文章來源:


ads
ads

相關文章
ads

相關文章

ad