1.sparklyr包簡介
Rstudio公司發布的sparklyr包具有以下幾個功能:
- 實現R與Spark的連接;
- sparklyr包提供了一個完整的dplyr後端,可篩選並聚合Spark數據集,接著在R中實現分析與可視化;
- 利用Spark的MLlib機器學習庫在R中實現分布式機器學習算法;
- 可以創建一個擴展,用於調用Spark API,並為Spark的所有包集提供了一個接口。
2.RStudio Server安裝sparklyr包
linux版本:ubuntu 16.04 LTS 64bit
R版本:R3.3.1 64bit
RStudio Server版本:rstudio-server-0.99.902 64bit
通過devtools包實現sparklyr包的安裝:
install.packages("devtools") devtools::install_github("rstudio/sparklyr")
註意:此處安裝devtools時Ubuntu中可能會出現安裝不上的錯誤:
看錯誤信息可以知道找不到openssl,需要安裝libssl-dev(Ubuntu):
$ sudo apt-get install libssl-dev
然後安裝sparklyr因為網速等的原因可能需要進行多次安裝,多嘗試幾次就可以了。如果安裝中斷,很可能安裝包被lock,可以使用下面的方案解決(以reader包為例):
install.packages(“readr”,dependencies=TRUE,INSTALL_opts = c(‘—no-lock’))
3.在本地安裝Spark 1.6.1和Hadoop 2.6
library(sparklyr) spark_install(version = "1.6.1")
此處默認的是使用Spark 1.6.1和Hadoop 2.6
如果用的是RStudio IDE,還需下載最新的預覽版IDE。它包含有實現與Spark交互的若幹增強功能(詳情參考RStudio IDE)。
https://www.rstudio.com/products/rstudio/download/preview/
http://spark.rstudio.com/index.html#rstudio-ide
4.部署Spark
4.1本地部署
安裝好sparklyr包後,我們連接本地的Spark,也可以連接遠程的Spark集群。這裏,使用spark_connect函數來連接本地的Spark:
library(sparklyr) library(dplyr) sc <- spark_connect(master = "local")
返回的Spark連接(sc)為Spark集群提供了一個遠程的dplyr數據源。
4.2集群部署
使用sparklyr連接遠程Spark集群Cluster Deployment,需要直接將R session部署在集群節點中的一個machine或者靠近集群處(根據網絡性能)。在此種情況下,R不是直接在集群上運行,所以,必須要保證集群中的每個machine都有一個spark version並且具有相同的配置。
在集群節點中的一個machine或者靠近集群處運行R最直截了當的方式可以通過遠程SSH會話或Rstudio server。在集群節點中使用的spark version必須已經在節點中部署,並且spark的路徑需要添加到環境變量SPARK_HOME中,因此,在嘗試一個連接之前必須保證SPARK_HOME環境變量在server中正確定義。通常是在Renviron.site配置文件中完成的。示例:
SPARK_HOME=/opt/spark/spark-1.6.1-bin-hadoop2.6
然後,通過spark_connect函數和主節點的地址連接,例如:
library(sparklyr) sc <- spark_connect(master = "spark://local:7077")
如果在EC2上使用Spark EC2部署腳本,可以從/root/spark-ec2/cluster-url讀取master,例如:
library(sparklyr) cluster_url <- system('cat /root/spark-ec2/cluster-url', intern=TRUE) sc <- spark_connect(master = cluster_url)
連接工具
可以通過spark-web函數觀看Spark web UI,通過spark_log函數觀看Spark log(日誌)
spark_web(sc)
spark_log(sc)
使用spark_disconnect函數斷開spark的連接:
spark_disconnect(sc)
5.配置configuration
本部分描述sparklyr包和潛在的spark集群的行為的配置的各個選項。同時介紹創建多個配置文件(比如開發、測試、生產)。
5.1配置文件Config Files
通過spark_connect函數的config參數可以指定Spark連接的配置。通過使用config=spark_config()創建默認的配置。下面的代碼代表了默認的行為:
spark_connect(master = "local", config = spark_config())
通過config=spark_config()函數可以從本地當前工作目錄(如果不是位於工作目在父目錄錄)的路徑中讀取配置文件config.yml中的數據。這個文件不是必需的,只是用來提供重寫默認行為overriding default behavior。還可以指定另一個配置文件名稱 和/或 位置。config.yml文件依次處理使用配置包(using the config package),配置包可以支持多命名配置文件。
5.2Package選項
有許多可用的選項配置sparklyr包的行為:
選項
描述
sparklyr.defaultPackages
自動包括在會話中的Spark packages (defaults to com.databricks:spark-csv_2.11:1.3.0” and “com.amazonaws:aws-Java-sdk-pom:1.10.34”)
sparklyr.cores.local
當在本地運行時使用的內核數量 (defaults to parallel::detectCores)
sparklyr.shell.*
傳遞給spark-shell的命令行參數 (see the Spark documentation for details on supported options)
舉個例子:下面的配置文件設置了本地內核數為4並分配給Spark驅動2G內存:
default: sparklyr.cores.local: 4 sparklyr.shell.driver-memory: 4GB
註:多文件的default使用將在下面描述。
5.3Spark選項
可以使用config.yml指定任意的spark配置屬性:
選項
描述
spark.*
任意配置屬性 (通過創建一個SparkConf包含指定的屬性應用)。spark的配置文檔可以查看可用的屬性。http://spark.apache.org/docs/latest/configuration.html
spark.sql.*
Spark SQL的任意配置屬性 (applied using SET)。Spark SQL Programming Guide的配置文檔可以查看可用的屬性。http://spark.apache.org/docs/latest/sql-programming-guide.html
舉個例子:下面的配置文件為spark設置了一個當前的工作目錄,並指定當揉數據(joins or aggregations)時使用的分區數量為100。
default: spark.local.dir: /tmp/spark-scratch spark.sql.shuffle.partitions: 100
5.4多文件配置
config包允許為不同環境定義多命名配置文件(例如:default, test, production)。所有額環境默認繼承default環境,並且可以相互繼承。
舉個例子:您可能想使用一個不同的數據集來開發和測試或可能希望使用只適用於生產集群上運行的自定義Spark配置屬性。config.yml表示如下
default: dataset: "observations-dev.parquet" sample-size: 10000 production: spark.memory.fraction: 0.9 spark.rdd.compress: true dataset: "observations.parquet" sample-size: null
還可以使用這個特點來為不同的環境指定不同的Spark master:
default: spark.master: "local" production: spark.master: "spark://local:7077"
使用上面的配置,可以在使用spark_connect()的時候徹底省略master參數:
sc <- spark_connect()
註意:當前活動配置通過R_CONFIG_ACTIVE環境變量的值決定,可以通過config package documentation詳細的了解。https://github.com/rstudio/config
6.預覽版RStudio Server
Rstudio server提供了一個基於web的IDE遠程的R會話接口,使其spark集群可以供前端使用。本部分介紹一些對於RStudio Server非常有用的額外的配置選項。RStudio的最新預覽版集成支持Spark和sparklyr包。包含以下工具https://www.rstudio.com/products/rstudio/download/preview/:
- 創建和管理Spark連接
- 瀏覽表格數據和Spark DataFrames的所有列
- 可以預覽Spark DataFrames的前1000行
6.1連接選項
一旦成功安裝完sparklyr包,我們可以在IDE中可以看到一個新的Spark窗口。該窗口包含一個New Connection對話框,用於連接本地或者遠程的Spark。如下所示:
可以使用rstudio.spark.connections選項配置哪一個連接,默認的可能是local和cluster連接,可以選擇其中之一作為提供的連接,或者使用一個特殊的Spark master URL。一些常用的連接組合的選擇包括:
Value
描述
c("local", "cluster")
Default 提供了本地和cluster spark instance的連接
"local"
提供了本地spark instance連接
"cluster"
提供了cluster spark instance連接
"spark://local:7077"
提供了特殊cluster的連接
c("spark://local:7077", "cluster")
提供了特殊cluster和其他cluster的連接
這些選項應該在Rprofile.site中設置,例如:
options(rstudio.spark.connections = "spark://local:7077")
6.2Spark安裝
如果是在本地模式(相對於集群模式),需要預裝spark version(s)並共享給使用該服務器的所有使用者。你可以安裝spark version(s)在一個共享的目錄中(e.g. /opt/spark),然後標明它作為spark安裝目錄。
options(spark.install.dir = "/opt/spark")
7.Sparklyr包的使用
7.1連接spark
安裝好sparklyr包之後,我們連接本地的Spark,也可以連接遠程的Spark集群。這裏,我們使用spark_connect函數來連接本地的Spark:
library(sparklyr) library(dplyr) sc <- spark_connect(master = "local")
返回的Spark連接(sc)為Spark集群提供了一個遠程的dplyr數據源。
出現下面的問題:
要求在Ubuntu中安裝Java:
方法一:
Windows中下載
從Windows中復制到Ubuntu中:
打開新的控制臺,創建目標文件夾:
root@love:/home/wangchao# cd /usr/lib
root@love:/usr/lib# sudo mkdir java
在原來的控制臺中輸入如下命令,安裝JDK:
$ sudo tar zvxf jdk-7u67-linux-x64.gz -C /usr/lib/java
# 該命令的意思是解壓jdk-7u67-linux-x64.gz文件,並把它安裝到/usr/lib/java目錄下,也就是前面創建的文件夾。註意命令中的-C是大寫的字母C。
打開系統配置文件.bashrc
$ sudo gedit .bashrc
更多詳情見請繼續閱讀下一頁的精彩內容:
在其末端添加下面的代碼,註意不要修改其他任何代碼,添加環境變量:
export JAVA_HOME=/usr/lib/java/jdk1.7.0_67 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=${JAVA_HOME}/lib:${JRE_HOME}/lib: export PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin:$PATH
測試是否安裝成功:
解壓後的文件:
方法二:
直接使用系統安裝:
$ sudo apt-get install openjdk-8-jdk $ sudo R CMD javareconf $ sudo R
以上安裝Java後還是出現了相同的問題:
安裝好後發現還是不行,找不到Java,這下可以確定不是jdk的問題了,應該是R不能找到jdk,於是想到了一個打通R和Java的通道包rJava包,下載和安裝rJava包:
Install.packages(“rJava”)
這下解決了以上的問題。
7.2讀取數據
使用copy_to函數可以實現將R中的數據框導入到Spark。下面我將R自帶的iris數據集,nycflights13包的flights數據集,以及Lahman包的Batting數據集復制到Spark(請確保安裝了這兩個包)。
install.packages(“nycflights13”) install.packages(“Lahman”) iris_tbl <- copy_to(sc, iris) flights_tbl <- copy_to(sc, nycflights13::flights, "flights") batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
此處由於在安裝的時候出現了中斷,導致依賴包readr包被鎖(lock)如下所示:
當然,解決這個問題,就是刪除已經安裝好的了readr包然後重新在rstudio server中安裝readr包,或者修改readr的訪問權限:
install.packages(“readr”) or 使用root權限 cd <readr存在的文件夾> $ sudo chmod -R 777 readr
使用dplyr的src_tbls函數可以列出所有可用的表(包括預先加載在集群內的表)。
src_tbls(sc) [1] "batting" "flights" "iris"
7.3使用dplyr語法
利用dplyr語法來對集群內的所有表進行操作,下面是一個簡單的數據篩選案例:
# 篩選出飛機晚點兩分鐘的航班信息 flights_tbl %>% filter(dep_delay == 2)
Source: query [?? x 16] database: spark connection master=local app=sparklyr local=TRUE year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight origin dest <int> <int> <int> <int> <dbl> <int> <dbl> <chr> <chr> <int> <chr> <chr> 1 2013 1 1 517 2 830 11 UA N14228 1545 EWR IAH 2 2013 1 1 542 2 923 33 AA N619AA 1141 JFK MIA 3 2013 1 1 702 2 1058 44 B6 N779JB 671 JFK LAX 4 2013 1 1 715 2 911 21 UA N841UA 544 EWR ORD 5 2013 1 1 752 2 1025 -4 UA N511UA 477 LGA DEN 6 2013 1 1 917 2 1206 -5 B6 N568JB 41 JFK MCO 7 2013 1 1 932 2 1219 -6 VX N641VA 251 JFK LAS 8 2013 1 1 1028 2 1350 11 UA N76508 1004 LGA IAH 9 2013 1 1 1042 2 1325 -1 B6 N529JB 31 JFK MCO 10 2013 1 1 1231 2 1523 -6 UA N402UA 428 EWR FLL .. ... ... ... ... ... ... ... ... ... ... ... ... Variables not shown: air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>.
dplyr導論提供了許多dplyr包中函數的使用案例。以下案例演示的是航班延誤信息的數據可視化:
dplyr導論https://cran.rstudio.com/web/packages/dplyr/vignettes/introduction.html
delay <- flights_tbl %>% group_by(tailnum) %>% summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>% filter(count > 20, dist < 2000, !is.na(delay)) %>% collect # 繪圖 library(ggplot2) ggplot(delay, aes(dist, delay)) + geom_point(aes(size = count), alpha = 1/2) + geom_smooth() + scale_size_area(max_size = 2)
7.4窗口函數
支持dplyr的窗口函數。如下所示:
batting_tbl %>% select(playerID, yearID, teamID, G, AB:H) %>% arrange(playerID, yearID, teamID) %>% group_by(playerID) %>% filter(min_rank(desc(H)) <= 2 & H > 0)
Source: query [?? x 7] Database: spark connection master=local app=sparklyr local=TRUE Groups: playerID playerID yearID teamID G AB R H <chr> <int> <chr> <int> <int> <int> <int> 1 anderal01 1941 PIT 70 223 32 48 2 anderal01 1942 PIT 54 166 24 45 3 balesco01 2008 WAS 15 15 1 3 4 balesco01 2009 WAS 7 8 0 1 5 bandoch01 1986 CLE 92 254 28 68 6 bandoch01 1984 CLE 75 220 38 64 7 bedelho01 1962 ML1 58 138 15 27 8 bedelho01 1968 PHI 9 7 0 1 9 biittla01 1977 CHN 138 493 74 147 10 biittla01 1975 MON 121 346 34 109 .. ... ... ... ... ... ... ...
更多dplyr在Spark中的用法參考http://spark.rstudio.com/dplyr.html
7.5調用MLlib
利用sparklyr包中的MLlib函數可以實現在Spark集群中調用機器學習算法。 這裏,我們使用ml_linear_regression函數來擬合一個線性回歸模型。數據為內置的mtcars數據集,我們想看看能否通過汽車的重量(wt)和發動機的氣缸數(cyl)來預測汽車的油耗(mpg)。我們假設mpg跟這兩個變量之間的關系是線性的。
# 將mtcar數據集復制到spark mtcars_tbl <- copy_to(sc, mtcars) # 先對數據做變換,然後將數據集分割為訓練集和測試集 partitions <- mtcars_tbl %>% filter(hp >= 100) %>% mutate(cyl8 = cyl == 8) %>% sdf_partition(training = 0.5, test = 0.5, seed = 1099) # 對訓練數據集做模型擬合 fit <- partitions$training %>% ml_linear_regression(response = "mpg", features = c("wt", "cyl")) Call: mpg ~ wt + cyl Coefficients: (Intercept) wt cyl 33.499452 -2.818463 -0.923187

對spark得到的線性回歸模型,使用summary()函數可以查看模型的擬合效果以及每個預測指標的統計意義。
summary(fit)
Call: mpg ~ wt + cyl Residuals: Min 1Q Median 3Q Max -1.752 -1.134 -0.499 1.296 2.282 Coefficients: Estimate Std. Error t value Pr(>|t|) (Intercept) 33.49945 3.62256 9.2475 0.0002485 *** wt -2.81846 0.96619 -2.9171 0.0331257 * cyl -0.92319 0.54639 -1.6896 0.1518998 --- Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1 R-Squared: 0.8274 Root Mean Squared Error: 1.422
參考:
Spark機器學習提供常用機器學習算法的實現和特征變換。更多信息請參考:
http://spark.rstudio.com/mllib.html
sparklyr包中的函數參考文檔:
http://spark.rstudio.com/reference/sparklyr/latest/index.html
創建sparklyr擴展:
http://spark.rstudio.com/extensions.html
官方網站:
http://spark.rstudio.com/index.html
本文永久更新鏈接地址:
Tags:
文章來源: