1. 程式人生 > >SparkR安裝部署及資料分析例項

SparkR安裝部署及資料分析例項

1. SparkR的安裝配置

1.1.       R與Rstudio的安裝

1.1.1.           R的安裝

我們的工作環境都是在Ubuntu下操作的,所以只介紹Ubuntu下安裝R的方法:

1)  在/etc/apt/sources.list新增源

deb http://mirror.bjtu.edu.cn/cran/bin/linux/ubuntu precise/,

然後更新源apt-get update;

2)  通過apt-get安裝:

sudo apt-get install r-base

1.1.2.           Rstudio的安裝

官網有詳細介紹:

sudo apt-get install gdebi-core

sudo apt-get install libapparmor1  # Required only for Ubuntu, not Debian

wget http://download2.rstudio.org/rstudio-server-0.97.551-amd64.deb

sudo gdebi rstudio-server-0.97.551-amd64.deb

1.2.       rJava安裝

1.2.1.           rJava介紹

    rJava是一個R語言和Java語言的通訊介面,通過底層JNI實現呼叫,允許在R中直接呼叫Java的物件和方法。

rJava還提供了Java呼叫R的功能,是通過JRI(Java/R Interface)實現的。JRI現在已經被嵌入到rJava的包中,我們也可以單獨試用這個功能。現在rJava包,已經成為很多基於Java開發R包的基礎功能元件。

正是由於rJava是底層介面,並使用JNI作為介面呼叫,所以效率非常高。在JRI的方案中,JVM通過記憶體直接載入RVM,呼叫過程效能幾乎無損耗,因此是非常高效連線通道,是R和Java通訊的首選開發包。

1.2.2.           rJava安裝

1)  配置rJava環境

執行R CMD javareconf

[email protected]:/home/payton# R CMD javareconf

2)  啟動R並安裝rJava

[email protected]:/home/payton# R

> install.packages("rJava")

1.3.       SparkR的安裝

1.3.1.           SparkR的程式碼下載

1.3.2.           SparkR的程式碼編譯

1)  解壓SparkR-pkg-master.zip,然後cd  SparkR-pkg-master/

2)  編譯的時候需要指明Hadoop版本和Spark版本

SPARK_HADOOP_VERSION=2.4.1 SPARK_VERSION=1.2.0 ./install-dev.sh

至此,單機版的SparkR已經安裝完成。

1.3.3.           分散式SparkR的部署配置

1)  編譯成功後,會生成一個lib資料夾,進入lib資料夾,打包SparkR為SparkR.tar.gz,這個是分散式SparkR部署的關鍵。

2)  由打包好的SparkR.tar.gz在各叢集節點上安裝SparkR

R CMD INSTALL SparkR.tar.gz

至此分散式SparkR搭建完成。

2. SparkR的執行

2.1.       SparkR的執行機制

SparkR是AMPLab釋出的一個R開發包,為Apache Spark提供了輕量的前端。SparkR提供了Spark中彈性分散式資料集(RDD)的API,使用者可以在叢集上通過R shell互動性的執行job。SparkR集合了Spark 和R的優勢,下面的這3幅圖很好的闡釋了SparkR的執行機制。

2.2.       用SparkR 進行資料分析

2.2.1.           SparkR基本操作

首先介紹下SparkR的基本操作:

第一步,載入SparkR包

library(SparkR)

第二步,初始化Spark context

sc <- sparkR.init(master=" spark://localhost:7077"

                  ,sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))

第三步,讀入資料,spark的核心是Resilient Distributed Dataset (RDD),RDDS可以從Hadoop的InputFormats來建立(例如,HDFS檔案)或通過轉化其它RDDS。例如直接從HDFS讀取資料為RDD的示例如下:

lines <- textFile(sc, "hdfs://sparkR_test.txt")

另外,也可以通過parallelize函式從向量或列表建立RDD,如:

rdd <- parallelize(sc, 1:10, 2)

到了這裡,那麼我們就可以運用RDD的動作(actions)和轉換(transformations)來對RDD進行操作併產生新的RDD;也可以很容易地呼叫R開發包,只需要在叢集上執行操作前用includePackage讀取R開發包就可以了(例:includePackage(sc, Matrix));當然還可以把RDD轉換為R語言格式的資料形式來對它進行操作。

具體可參見如下兩個連結:

那麼下面我們就通過兩個示例來看下 SparkR是如何執行的吧。

2.2.2.           SparkR使用舉例

1) Example1:word count

# 載入SparkR包
library(SparkR)
# 初始化 Spark context
sc <- sparkR.init(master="spark://叢集ip:7077"
                  ,sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))
# 從HDFS上讀取檔案
lines <- textFile(sc, "hdfs://叢集ip:8020/tmp/sparkR_test.txt")
# 按分隔符拆分每一行為多個元素,這裡返回一個序列
words<-flatMap(lines,function(line) {strsplit(line,"\\|")[[1]]})
# 使用 lapply 來定義對應每一個RDD元素的運算,這裡返回一個(K,V)對
wordCount <-lapply(words, function(word) { list(word, 1L) })
# 對(K,V)對進行聚合計算
counts<-reduceByKey(wordCount,"+",2L)
# 以陣列的形式,返回資料集的所有元素
output <- collect(counts)
# 按格式輸出結果
for (wordcount in output) {
  cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
}

2) Example2:logistic regression

# 載入SparkR包
library(SparkR)
# 初始化 Spark context
sc <- sparkR.init(master="叢集ip:7077",
                  appName='sparkr_logistic_regression',
                  sparkEnvir=list(spark.executor.memory='1g',
                                  spark.cores.max="10"))
# 從hdfs上讀取txt檔案,    該RDD由spark叢集的4個分割槽構成
input_rdd <- textFile(sc,
 "hdfs://叢集ip:8020/user/payton/german.data-numeric.txt",
minSplits=4)
# 解析每個RDD元素的文字(在每個分割槽上並行)
dataset_rdd <- lapplyPartition(input_rdd, function(part) {
  part <- lapply(part, function(x) unlist(strsplit(x, '\\s')))
  part <- lapply(part, function(x) as.numeric(x[x != '']))
  part
})
# 我們需要把資料集dataset_rdd分割為訓練集(train)和測試集(test)兩部分,這裡
# ptest為測試集的樣本比例,如取ptest=0.2,即取dataset_rdd的20%樣本數作為測試
# 集,80%的樣本數作為訓練集
split_dataset <- function(rdd, ptest) {
  #以輸入樣本數ptest比例建立測試集RDD
  data_test_rdd <- lapplyPartition(rdd, function(part) {
    part_test <- part[1:(length(part)*ptest)]
    part_test
  })
  # 用剩下的樣本數建立訓練集RDD
  data_train_rdd <- lapplyPartition(rdd, function(part) {
    part_train <- part[((length(part)*ptest)+1):length(part)]
    part_train
  })
  # 返回測試集RDD和訓練集RDD的列表
  list(data_test_rdd, data_train_rdd)
}
# 接下來我們需要轉化資料集為R語言的矩陣形式,並增加一列數字為1的截距項,
# 將輸出項y標準化為0/1的形式
get_matrix_rdd <- function(rdd) {
  matrix_rdd <- lapplyPartition(rdd, function(part) {
    m <- matrix(data=unlist(part, F, F), ncol=25, byrow=T)
    m <- cbind(1, m)
    m[,ncol(m)] <- m[,ncol(m)]-1
    m
  })
  matrix_rdd
}
# 由於該訓練集中y的值為1與0的樣本數比值為7:3,所以我們需要平衡1和0的樣本
# 數,使它們的樣本數一致
balance_matrix_rdd <- function(matrix_rdd) {
  balanced_matrix_rdd <- lapplyPartition(matrix_rdd, function(part) {
    y <- part[,26]
    index <- sample(which(y==0),length(which(y==1)))
    index <- c(index, which(y==1))
    part <- part[index,]
    part
  })
  balanced_matrix_rdd
}
# 分割資料集為訓練集和測試集
dataset <- split_dataset(dataset_rdd, 0.2)
# 建立測試集RDD
matrix_test_rdd <- get_matrix_rdd(dataset[[1]])
# 建立訓練集RDD
matrix_train_rdd <- balance_matrix_rdd(get_matrix_rdd(dataset[[2]]))
# 將訓練集RDD和測試集RDD放入spark分散式叢集記憶體中
cache(matrix_test_rdd)
cache(matrix_train_rdd)
# 初始化向量theta
theta<- runif(n=25, min = -1, max = 1)
# logistic函式
hypot <- function(z) {
  1/(1+exp(-z))
}
# 損失函式的梯度計算
gCost <- function(t,X,y) {
  1/nrow(X)*(t(X)%*%(hypot(X%*%t)-y))
# 定義訓練函式
train <- function(theta, rdd) {
  # 計算梯度
  gradient_rdd <- lapplyPartition(rdd, function(part) {
    X <- part[,1:25]
    y <- part[,26]
    p_gradient <- gCost(theta,X,y)
    list(list(1, p_gradient))
  })
  agg_gradient_rdd <- reduceByKey(gradient_rdd, '+', 1L)
  # 一次迭代聚合輸出
  collect(agg_gradient_rdd)[[1]][[2]]
}
# 由梯度下降演算法優化損失函式
# alpha :學習速率
# steps :迭代次數
# tol :收斂精度
alpha <- 0.1
tol <- 1e-4
step <- 1
while(T) {
  cat("step: ",step,"\n")
  p_gradient <- train(theta, matrix_train_rdd)
  theta <- theta-alpha*p_gradient
  gradient <- train(theta, matrix_train_rdd)
  if(abs(norm(gradient,type="F")-norm(p_gradient,type="F"))<=tol) break
  step <- step+1
}
# 用訓練好的模型預測測試集信貸評測結果(“good”或“bad”),並計算預測正確率
test <- lapplyPartition(matrix_test_rdd, function(part) {
    X <- part[,1:25]
    y <- part[,26]
    y_pred <- hypot(X%*%theta)
    result <- xor(as.vector(round(y_pred)),as.vector(y))
})
result<-unlist(collect(test))
corrects = length(result[result==F])
wrongs = length(result[result==T])
cat("\ncorrects: ",corrects,"\n")
cat("wrongs: ",wrongs,"\n")
cat("accuracy: ",corrects/length(y_pred),"\n")

相關推薦

SparkR安裝部署資料分析例項

1. SparkR的安裝配置 1.1.       R與Rstudio的安裝 1.1.1.           R的安裝 我們的工作環境都是在Ubuntu下操作的,所以只介紹Ubuntu下安裝R的方法: 1)  在/etc/apt/sources.list新增源

Spark環境安裝部署詞頻統計例項

Spark是一個高效能的分散式計算框架,由於是在記憶體中進行操作,效能比MapReduce要高出很多. 具體的我就不介紹了,直接開始安裝部署並進行例項測試 首先在官網下載http://spark.ap

安裝python資料分析相關安裝包小結

由於重灌系統以及64位電腦安裝了32位python導致資料量匯入過大時,出現memoryerror錯誤,乾脆總結安裝過程,省得每次安裝去找教程和資源。 Python安裝從官方網站下載python,各

Flink環境安裝部署、詞頻統計例項、WordCount原始碼分析

./start-cluster.sh 瀏覽器輸入http://localhost:8081可以看到UI介面 單詞統計例項: jar包所在位置(安裝包自帶) 依次輸入: ./flink run .

資料遷移工具 sqoop 安裝部署實戰

目錄 概述 工作機制 安裝部署 實戰 1.概述 sqoop是apache旗下一款“Hadoop和關係資料庫伺服器之間傳送資料”的工具。 匯入資料:MySQL,Oracle匯入資料到Hadoop的HDFS、HIVE、HBASE等資料儲存系統; 匯

centos7上mariadb10.3多例項安裝部署主從複製

    mariaDB是開源的資料庫,是mysql的衍生版。    Mariadb官方: https://mariadb.com/   os:centos7  mariadb:10.3 yum安裝實現。

zookeeper與kafka安裝部署java環境搭建

3.4 項目目錄 tin bytes result zxvf util ise cat 1. ZooKeeper安裝部署 本文在一臺機器上模擬3個zk server的集群安裝。 1.1. 創建目錄、解壓 cd /usr/ #創建項目目錄 mkdir zookeepe

ELK部署logstash安裝部署應用(二)

日誌 elk elkstack Logstash 安裝部署註意事項: Logstash基本概念:logstash收集日誌基本流程: input-->codec-->filter-->codec-->outputinput:從哪裏收集日誌。filter:發出去前進行過濾out

Wireshark安裝使用報文分析(圖文詳解)

p s 技術 cap cut .net 信息 display 過程 數據 Wireshark是世界上最流行的網絡分析工具。這個強大的工具可以捕捉網絡中的數據,並為用戶提供關於網絡和上層協議的各種信息。與很多其他網絡工具一樣,Wireshark也使用pcapnetwork l

Ansible安裝部署常用模塊詳解

就會 新用戶 特殊 packages add chdir epel change ima Ansible命令使用 Ansible語法使用ansible <pattern_goes_here> -m <module_name> -a <argum

redis-rdb-tools安裝部署使用

resp oot 用戶 all mov json格式 odin hash 出現 redis內存分析工具redis-rdb-tools安裝部署項目地址:https://github.com/sripathikrishnan/redis-rdb-tools 安裝部署 安裝Py

Wireshark安裝使用報文分析

clas aid ati wire wireshark ID shark static str 先看鏈接!!! Wireshark使用教程:https://jingyan.baidu.com/article/93f9803fe902f7e0e56f5553.html Wir

Zabbix服務安裝部署監控配置

sed master edi extension 客戶 取數據 鏈接 prefix HA Zabbix服務安裝部署及監控配置 1.1 Zabbix服務介紹 官方網站:http://www.zabbix.com/ The Enterprise-class Monitori

MySQL-5.6.38 安裝部署介紹

root 準備 term p s 查詢 ace p12 libaio mmu 第1章 MySQL簡介及部署1.1 介紹1.1.1 什麽是數據數據是指對客觀事件進行記錄並可以鑒別的符號,是對客觀事物的性質、狀態以及相互關系等進行記載的物理符號或這些物理符號的組合。它是可識別的

自動化運維工具---SaltStack安裝部署簡單案例

com 常用模塊 分類 fire thead 批量部署 配置 出現 nio SaltStack原理 SaltStack由Master(服務端)和Minion(客戶端)組成,Master和Minion之間通過ZeroMQ(消息隊列)進行通訊,Master和Minion分別監

Memcached安裝部署基本操作

ant 通過 telnet 防火 sql com memcache 1.8 ORC 何為Memcached? Memcached 是一套開源的高性能分布式內存對象緩存系統,它將所有的數據都存儲在內存中,因為在內存中會統一維護一張巨大的Hash表,所以支持任意存儲類型的數據

Zookeeper 安裝部署常用命令

服務管理 啟動ZK服務: zkServer.sh start 檢視ZK狀態: zkServer.sh status 停止ZK服務: zkServer.sh stop 重啟ZK服務: zkServer.sh restart 終端操作 使用 zkCli 可以簡單的對 Zo

Kettle-6.1安裝部署使用教程

一、Kettle概念 Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上執行,綠色無需安裝,資料抽取高效穩定。 Kettle 中文名稱叫水壺,該專案的主程式設計師MATT 希望把各種資料放到一個壺裡,然後以一種指定的格式流出。 Kettle這個ETL工具集

Hadoop建設工具Ambari的安裝部署完整使用(五)——Ambari使用之叢集解除安裝

五.Ambari使用——解除安裝叢集 方式一: ambari本身並沒有提供基於web管理端的自動化解除安裝的功能。ambari web管理端的操作更多的是實現服務或是主機的擴充套件(無法完全清除服務和主機)。因此本人通過實踐,也整理了一份手動解除安裝指令碼: 注:因為此指令碼只是根

Hadoop建設工具Ambari的安裝部署完整使用(四)——Ambari使用之叢集建立

四.Ambari使用——建立叢集 登入並建立叢集 1) 以管理員登入ambari-server,使用者名稱和密碼預設為:admin/admin 2) 點選【Launch Install Wizard】開始安裝叢集   3) 給叢集命名