1. 程式人生 > >R語言並行化基礎與提高

R語言並行化基礎與提高

本文將介紹R中的平行計算,並給出了一些常見的陷進以及避免它們的小技巧。
使用平行計算的原因就是因為程式執行時間太長。大部分程式都是可以並行化的,它們大部分都是Embarrassingly parallel。這裡介紹幾種可以並行化的方法:

  • Bootstrapping
  • 交叉驗證(Cross-validation)
  • (Multivariate Imputation by Chained Equations ,MICE)相關介紹:R語言中的缺失值處理
  • 擬合多元迴歸方程

學習lapply是關鍵

沒有早點學習lapply是我的遺憾之一。這函式即優美又簡單:它只需要一個引數(一個vector或list),和一個以該引數為輸入的函式,最後返回一個列表。

> lapply(1:3, function(x) c(x, x^2, x^3))
[[1]]
 [1] 1 1 1

[[2]]
 [1] 2 4 8

[[3]]
 [1] 3 9 27

你還可以新增額外的引數:

> lapply(1:3/3, round, digits=3)
[[1]]
[1] 0.333

[[2]]
[1] 0.667

[[3]]
[1] 1

當每個元素都是獨立地計算時,這個任務就是 Embarrassingly parallel的。當你學習完使用lapply之後,你會發現並行化你的程式碼就像喝水一樣簡單。

parallel

使用 parallel

包,首先要初始化一個叢集,這個叢集的數量最好是你CPU核數-1。如果一臺8核的電腦建立了數量為8的叢集,那你的CPU就幹不了其他事情了。所以可以這樣啟動一個叢集:

library(parallel)

# Calculate the number of cores
no_cores <- detectCores() - 1

# Initiate cluster
cl <- makeCluster(no_cores)

現在只需要使用並行化版本的lapply,parLapply就可以了

parLapply(cl, 2:4,
          function(exponent)
            2
^exponent) [[1]] [1] 4 [[2]] [1] 8 [[3]] [1] 16

當我們結束後,要記得關閉叢集,否則你電腦的記憶體會始終被R佔用

stopCluster(cl)

變數作用域

在Mac/Linux中你可以使用 makeCluster(no_core, type="FORK")這一選項從而當你並行執行的時候可以包含所有環境變數。
在Windows中由於使用的是Parallel Socket Cluster (PSOCK),所以每個叢集只會載入base包,所以你執行時要指定載入特定的包或變數:

cl<-makeCluster(no_cores)

base <- 2
clusterExport(cl, "base")
parLapply(cl, 
          2:4, 
          function(exponent) 
            base^exponent)

stopCluster(cl)

[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16

注意到你需要用clusterExport(cl, "base")把base這一個變數載入到叢集當中。如果你在函式中使用了一些其他的包就要使用clusterEvalQ載入進去,比如說,使用rms包,那麼就用clusterEvalQ(cl, library(rms))。要注意的是,在clusterExport 載入某些變數後,這些變數的任何變化都會被忽略:

cl<-makeCluster(no_cores)
clusterExport(cl, "base")
base <- 4
# Run
parLapply(cl, 
          2:4, 
          function(exponent) 
            base^exponent)

# Finish
stopCluster(cl)
[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16

使用parSapply

如果你想程式返回一個向量或者矩陣。而不是一個列表,那麼就應該使用sapply,他同樣也有並行版本parSapply:

> parSapply(cl, 2:4, 
          function(exponent) 
            base^exponent)
[1]  4  8 16

輸出矩陣並顯示行名和列名(因此才需要使用as.character

> parSapply(cl, as.character(2:4), 
          function(exponent){
            x <- as.numeric(exponent)
            c(base = base^x, self = x^x)
          })
2  3   4
base 4  8  16
self 4 27 256

foreach

設計foreach包的思想可能想要建立一個lapply和for迴圈的標準,初始化的過程有些不同,你需要register註冊叢集:

library(foreach)
library(doParallel)

cl<-makeCluster(no_cores)
registerDoParallel(cl)

要記得最後要結束叢集(不是用stopCluster()):

stopImplicitCluster()

foreach函式可以使用引數.combine控制你彙總結果的方法:

> foreach(exponent = 2:4, 
        .combine = c)  %dopar%  
  base^exponent
  [1]  4  8 16
> foreach(exponent = 2:4, 
        .combine = rbind)  %dopar%  
  base^exponent
    [,1]
result.1    4
result.2    8
result.3   16
foreach(exponent = 2:4, 
        .combine = list,
        .multicombine = TRUE)  %dopar%  
  base^exponent
[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16

注意到最後list的combine方法是預設的。在這個例子中用到一個.multicombine引數,他可以幫助你避免巢狀列表。比如說list(list(result.1, result.2), result.3) :

> foreach(exponent = 2:4, 
        .combine = list)  %dopar%  
  base^exponent
[[1]]
[[1]][[1]]
[1] 4

[[1]][[2]]
[1] 8


[[2]]
[1] 16

變數作用域

在foreach中,變數作用域有些不同,它會自動載入本地的環境到函式中:

> base <- 2
> cl<-makeCluster(2)
> registerDoParallel(cl)
> foreach(exponent = 2:4, 
        .combine = c)  %dopar%  
  base^exponent
stopCluster(cl)
 [1]  4  8 16

但是,對於父環境的變數則不會載入,以下這個例子就會丟擲錯誤:

test <- function (exponent) {
  foreach(exponent = 2:4, 
          .combine = c)  %dopar%  
    base^exponent
}
test()

 Error in base^exponent : task 1 failed - "object 'base' not found" 

為解決這個問題你可以使用.export這個引數而不需要使用clusterExport。注意的是,他可以載入最終版本的變數,在函式執行前,變數都是可以改變的:

base <- 2
cl<-makeCluster(2)
registerDoParallel(cl)

base <- 4
test <- function (exponent) {
  foreach(exponent = 2:4, 
          .combine = c,
          .export = "base")  %dopar%  
    base^exponent
}
test()

stopCluster(cl)

 [1]  4  8 16

類似的你可以使用.packages引數來載入包,比如說:.packages = c("rms", "mice")

使用Fork還是sock?

我在windows上做了很多分析,也習慣了使用PSOCK系統。對於使用其他系統的人要意識到這兩個的區別:

  • FORK:”to divide in branches and go separate ways”
    系統:Unix/Mac (not Windows)
    環境: 所有
  • PSOCK:並行socket叢集
    系統: All (including Windows)
    環境: 空

記憶體控制

如果你不打算使用windows的話,建議你嘗試FORK模式,它可以實現記憶體共享,節省你的記憶體。
PSOCK:

library(pryr) # Used for memory analyses
cl<-makeCluster(no_cores)
clusterExport(cl, "a")
clusterEvalQ(cl, library(pryr))
parSapply(cl, X = 1:10, function(x) {address(a)}) == address(a)
[1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE

FORK :

cl<-makeCluster(no_cores, type="FORK")
parSapply(cl, X = 1:10, function(x) address(a)) == address(a)
 [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE

你不需要花費太多時間去配置你的環境,有趣的是,你不需要擔心變數衝突:

b <- 0
parSapply(cl, X = 1:10, function(x) {b <- b + 1; b})
# [1] 1 1 1 1 1 1 1 1 1 1
parSapply(cl, X = 1:10, function(x) {b <<- b + 1; b})
# [1] 1 2 3 4 5 1 2 3 4 5
b
# [1] 0

除錯

當你在並行環境中工作是,debug是很困難的,你不能使用browser/cat/print等函式來發現你的問題。

tryCatch-list方法

使用stop()函式這不是一個好方法,因為當你收到一個錯誤資訊時,很可能這個錯誤資訊你在很久之前寫的,都快忘掉了,但是當你的程式跑了1,2天后,突然彈出這個錯誤,就只因為這一個錯誤,你的程式終止了,並把你之前的做的計算全部扔掉了,這是很討厭的。為此,你可以嘗試使用tryCatch去捕捉那些錯誤,從而使得出現錯誤後程序還能繼續執行:

foreach(x=list(1, 2, "a"))  %dopar%  
{
  tryCatch({
    c(1/x, x, 2^x)
  }, error = function(e) return(paste0("The variable '", x, "'", 
                                      " caused the error: '", e, "'")))
}
[[1]]
[1] 1 1 2

[[2]]
[1] 0.5 2.0 4.0

[[3]]
[1] "The variable 'a' caused the error: 'Error in 1/x: non-numeric argument to binary operator\n'"

這也正是我喜歡list的原因,它可以方便的將所有相關的資料輸出,而不是隻輸出一個錯誤資訊。這裡有一個使用rbindlapply進行conbine的例子:

`out <- lapply(1:3, function(x) c(x, 2^x, x^x))
do.call(rbind, out)
 [,1] [,2] [,3]
[1,]    1    2    1
[2,]    2    4    4
[3,]    3    8   27

建立一個檔案輸出

當我們無法在控制檯觀測每個工作時,我們可以設定一個共享檔案,讓結果輸出到檔案當中,這是一個想當舒服的解決方案:

cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  print(x)
}
stopCluster(cl)
starting worker pid=7392 on localhost:11411 at 00:11:21.077
starting worker pid=7276 on localhost:11411 at 00:11:21.319
starting worker pid=7576 on localhost:11411 at 00:11:21.762
[1] 2]

[1] "a"

建立一個結點專用檔案

一個或許更為有用的選擇是建立一個結點專用的檔案,如果你的資料集存在一些問題的時候,可以方便觀測:

cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  cat(dput(x), file = paste0("debug_file_", x, ".txt"))
} 
stopCluster(cl)

partools

partools這個包有一個dbs()函式或許值得一看(使用非windows系統值得一看),他允許你聯合多個終端給每個程序進行debug。

Caching

當做一個大型計算時,我強烈推薦使用一些快取。這或許有多個原因你想要結束計算,但是要遺憾地浪費了計算的寶貴的時間。這裡有一個包可以做快取,R.cache,但是我發現自己寫個函式來實現更加簡單。你只需要嵌入digest包就可以。digest()函式是一個雜湊函式,把一個R物件輸入進去可以輸出一個md5值或sha1等從而得到一個唯一的key值,當你key匹配到你儲存的cache中的key時,你就可以繼續你的計算了,而不需要將演算法重新執行,以下是一個使用例子:

cacheParallel <- function(){
  vars <- 1:2
  tmp <- clusterEvalQ(cl, 
                      library(digest))

  parSapply(cl, vars, function(var){
    fn <- function(a) a^2
    dg <- digest(list(fn, var))
    cache_fn <- 
      sprintf("Cache_%s.Rdata", 
              dg)
    if (file.exists(cache_fn)){
      load(cache_fn)
    }else{
      var <- fn(var); 
      Sys.sleep(5)
      save(var, file = cache_fn)
    }
    return(var)
  })
}

這個例子很顯然在第二次執行的時候並沒有啟動Sys.sleep,而是檢測到了你的cache檔案,載入了上一次計算後的cache,你就不必再計算Sys.sleep了,因為在上一次已經計算過了。

system.time(out <- cacheParallel())
# user system elapsed
# 0.003 0.001 5.079
out
# [1] 1 4
system.time(out <- cacheParallel())
# user system elapsed
# 0.001 0.004 0.046
out
# [1] 1 4

# To clean up the files just do:
file.remove(list.files(pattern = "Cache.+\.Rdata"))

載入平衡

任務載入

需要注意的是,無論parLapply還是foreach都是一個包裝(wrapper)的函式。這意味著他們不是直接執行平行計算的程式碼,而是依賴於其他函式實現的。在parLapply中的定義如下:

parLapply <- function (cl = NULL, X, fun, ...) 
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl, x = splitList(X, length(cl)), 
        fun = lapply, fun, ...), quote = TRUE)
}

注意到splitList(X, length(cl)) ,他會將任務分割成多個部分,然後將他們傳送到不同的叢集中。如果你有很多cache或者存在一個任務比其他worker中的任務都大,那麼在這個任務結束之前,其他提前結束的worker都會處於空閒狀態。為了避免這一情況,你需要將你的任務儘量平均分配給每個worker。舉個例子,你要計算優化神經網路的引數,這一過程你可以並行地以不同引數來訓練神經網路,你應該將如下程式碼:

# From the nnet example
parLapply(cl, c(10, 20, 30, 40, 50), function(neurons) 
  nnet(ir[samp,], targets[samp,],
       size = neurons))

改為:

# From the nnet example
parLapply(cl, c(10, 50, 30, 40, 20), function(neurons) 
  nnet(ir[samp,], targets[samp,],
       size = neurons))

記憶體載入

在大資料的情況下使用平行計算會很快的出現問題。因為使用平行計算會極大的消耗記憶體,你必須要注意不要讓你的R執行記憶體到達記憶體的上限,否則這將會導致崩潰或非常緩慢。使用Forks是一個控制記憶體上限的一個重要方法。Fork是通過記憶體共享來實現,而不需要額外的記憶體空間,這對效能的影響是很顯著的(我的系統時16G記憶體,8核心):

> rm(list=ls())
> library(pryr)
> library(magrittr)
> a <- matrix(1, ncol=10^4*2, nrow=10^4)
> object_size(a)
1.6 GB
> system.time(mean(a))
   user  system elapsed 
  0.338   0.000   0.337 
> system.time(mean(a + 1))
   user  system elapsed 
  0.490   0.084   0.574 
> library(parallel)
> cl <- makeCluster(4, type = "PSOCK")
> system.time(clusterExport(cl, "a"))
   user  system elapsed 
  5.253   0.544   7.289 
> system.time(parSapply(cl, 1:8, 
                        function(x) mean(a + 1)))
   user  system elapsed 
  0.008   0.008   3.365 
> stopCluster(cl); gc();
> cl <- makeCluster(4, type = "FORK")
> system.time(parSapply(cl, 1:8, 
                        function(x) mean(a + 1)))
   user  system elapsed 
  0.009   0.008   3.123 
> stopCluster(cl)

FORKs可以讓你並行化從而不用崩潰:

> cl <- makeCluster(8, type = "PSOCK")
> system.time(clusterExport(cl, "a"))
   user  system elapsed 
 10.576   1.263  15.877 
> system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
Error in checkForRemoteErrors(val) : 
  8 nodes produced errors; first error: cannot allocate vector of size 1.5 Gb
Timing stopped at: 0.004 0 0.389 
> stopCluster(cl)
> cl <- makeCluster(8, type = "FORK")
> system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
   user  system elapsed 
  0.014   0.016   3.735 
> stopCluster(cl)

當然,他並不能讓你完全解放,如你所見,當我們建立一箇中間變數時也是需要消耗記憶體的:

> a <- matrix(1, ncol=10^4*2.1, nrow=10^4)
> cl <- makeCluster(8, type = "FORK")
> parSapply(cl, 1:8, function(x) {
+   b <- a + 1
+   mean(b)
+   })
Error in unserialize(node$con) : error reading from connection

記憶體建議

  • 儘量使用rm()避免無用的變數
  • 儘量使用gc()釋放記憶體。即使這在R中是自動執行的,但是當它沒有及時執行,在一個平行計算的情況下,如果沒有及時釋放記憶體,那麼它將不會將記憶體返回給作業系統,從而影響了其他worker的執行。
  • 通常並行化在大規模運算下很有用,但是,考慮到R中的並行化存在記憶體的初始化成本,所以考慮到記憶體的情況下,顯然小規模的並行化可能會更有用。
  • 有時候在平行計算時,不斷做快取,當達到上限時,換回序列計算。
  • 你也可以手動的控制每個核所使用的記憶體數量,一個簡單的方法就是:memory.limit()/memory.size() = max cores

其他建議

  • 一個常用的CPU核數檢測函式:
max(1, detectCores() - 1)
  • 永遠不要使用set.seed(),使用clusterSetRNGStream()來代替設定種子,如果你想重現結果。
  • 如果你有Nvidia 顯示卡,你可以嘗試使用gputools 包進行GPU加速(警告:安裝可能會很困難)
  • 當使用mice並行化時記得使用ibind()來合併項。

作為分享主義者(sharism),本人所有網際網路釋出的圖文均遵從CC版權,轉載請保留作者資訊並註明作者a358463121專欄:http://blog.csdn.net/a358463121,如果涉及原始碼請註明GitHub地址:https://github.com/358463121/。商業使用請聯絡作者。