Florian Privé

R(cpp) enthusiast

A guide to parallelism in R

Written on September 5, 2017

In this post, I talk about parallelism in R. This post is likely biased towards the solutions I use. For example, I never use mcapply nor clusterApply; I prefer to always use foreach. In this post, we will focus on how to parallelize R code on your computer with package {foreach}.

In this post, I use mainly silly examples just to show one point at a time.

Basics of foreach

You can install R package {foreach} with install.packages("foreach").

library(foreach)

foreach(i = 1:3) %do% {
  sqrt(i)
}
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 1.414214
## 
## [[3]]
## [1] 1.732051

In the example above, you iterate on i and apply the expression sqrt(i). Function foreach returns a list by default. A common mistake is to think that foreach is like a for-loop. Actually, foreach is more like lapply.

lapply(1:3, function(i) {
  sqrt(i)
})
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 1.414214
## 
## [[3]]
## [1] 1.732051

Parameter .combine can be very useful. Yet, now, I usually prefer to combine the results afterwards (see do.call below).

foreach(i = 1:3, .combine = 'c') %do% {
  sqrt(i)
}
## [1] 1.000000 1.414214 1.732051

With lapply, we would do

res <- lapply(1:3, function(i) {
  sqrt(i)
})
do.call('c', res)
## [1] 1.000000 1.414214 1.732051

Parallelize with foreach

You need to do at least two things:

  • replace %do% by %dopar%. Basically, always use %dopar% because you can use registerDoSEQ() is you really want to run the foreach sequentially.

  • register a parallel backend using one of the packages that begin with do (such as doParallel, doMC, doMPI and more). I will list only the two main parallel backends because there are too many of them.

Using clusters

# Example registering clusters
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
  sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
parallel::stopCluster(cl)

In this situation, all the data and packages used must be exported (copied) to the clusters, which can add some overhead. Yet, at least, you know what you do.

Using forking

cl <- parallel::makeForkCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
  sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
parallel::stopCluster(cl)

Forking just copy the R session in its current state. This is very fast because it copies objects only it they are modified. Moreover, you don’t need to export variables nor packages because they are already in the session. However, this can’t be used on Windows. This is why I use the clusters option in my packages.

Common problems/mistakes/questions

Exporting variables and packages

“object”xxx" not found" or “could not find function”xxx“”.

# Some data and function
library(dplyr)
dfs <- rep(list(iris), 3)
count(dfs[[1]], Species)
## # A tibble: 3 x 2
##   Species        n
##   <fct>      <int>
## 1 setosa        50
## 2 versicolor    50
## 3 virginica     50
# Sequential processing to apply to 
# all the data frames of the list 'dfs'
registerDoSEQ()
myFun <- function() {
  foreach(i = seq_along(dfs)) %dopar% {
    df <- dfs[[i]]
    count(df, Species)
  }
}
str(myFun())
## List of 3
##  $ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..$ n      : int [1:3] 50 50 50
##  $ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..$ n      : int [1:3] 50 50 50
##  $ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..$ n      : int [1:3] 50 50 50
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun(), error = function(e) print(e))
## <simpleError in {    df <- dfs[[i]]    count(df, Species)}: task 1 failed - "objet 'dfs' introuvable">
parallel::stopCluster(cl)

Why doesn’t this work anymore? foreach will export all the needed variables that are present in its environment (here, the environment of myFun) and dfs is not in this environment. Some will tell you to use option .export of foreach but I don’t think it’s good practice. You just have to pass dfs to myFun.

myFun2 <- function(dfs) {
  foreach(i = seq_along(dfs)) %dopar% {
    df <- dfs[[i]]
    count(df, Species)
  }
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun2(dfs), error = function(e) print(e))
## <simpleError in {    df <- dfs[[i]]    count(df, Species)}: task 1 failed - "impossible de trouver la fonction "count"">
parallel::stopCluster(cl)

This still doesn’t work. You also need to load packages. You could use option .packages of foreach but you could simply add dplyr:: before count. Moreover, it is clearer (like one does in packages).

myFun3 <- function(dfs) {
  foreach(i = seq_along(dfs)) %dopar% {
    df <- dfs[[i]]
    dplyr::count(df, Species)
  }
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun3(dfs), error = function(e) print(e))
## [[1]]
## # A tibble: 3 x 2
##   Species        n
##   <fct>      <int>
## 1 setosa        50
## 2 versicolor    50
## 3 virginica     50
## 
## [[2]]
## # A tibble: 3 x 2
##   Species        n
##   <fct>      <int>
## 1 setosa        50
## 2 versicolor    50
## 3 virginica     50
## 
## [[3]]
## # A tibble: 3 x 2
##   Species        n
##   <fct>      <int>
## 1 setosa        50
## 2 versicolor    50
## 3 virginica     50
parallel::stopCluster(cl)

Iterate over lots of elements.

cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
  foreach(i = seq_len(2e4), .combine = 'c') %dopar% {
    sqrt(i)
  }
)
##    user  system elapsed 
##   6.681   0.479   7.611
parallel::stopCluster(cl)

Iterating over multiple elements in R is bad for performance. Moreover, foreach is only combining results 100 by 100, which also slows computations.

If there are too many elements to loop over, the best is to split the computation in ncores blocks and to perform some optimized sequential work on each block. In package {bigstatsr}, I use the following function to split indices in nb groups because I often need to iterate over hundreds of thousands of elements (columns).

bigstatsr:::CutBySize
## function (m, block.size, nb = ceiling(m/block.size)) 
## {
##     if (nb > m) 
##         nb <- m
##     int <- m/nb
##     upper <- round(1:nb * int)
##     lower <- c(1, upper[-nb] + 1)
##     size <- c(upper[1], diff(upper))
##     cbind(lower, upper, size)
## }
## <bytecode: 0xad02c80>
## <environment: namespace:bigstatsr>
bigstatsr:::CutBySize(20, nb = 3)
##      lower upper size
## [1,]     1     7    7
## [2,]     8    13    6
## [3,]    14    20    7

Filling something in parallel

Using foreach loop in R returning NA

mat <- matrix(nrow = 5, ncol = 8)
registerDoSEQ()
# Nested foreach loop
tmp <- foreach(j = 1:8) %:% foreach(i = 1:5) %dopar% {
  mat[i, j] <- i + j
}
mat
##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    2    3    4    5    6    7    8    9
## [2,]    3    4    5    6    7    8    9   10
## [3,]    4    5    6    7    8    9   10   11
## [4,]    5    6    7    8    9   10   11   12
## [5,]    6    7    8    9   10   11   12   13
# Try in parallel
mat2 <- matrix(nrow = 5, ncol = 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp2 <- foreach(j = 1:8) %:% foreach(i = 1:5) %dopar% {
  mat2[i, j] <- i + j
}
parallel::stopCluster(cl)
mat2
##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]   NA   NA   NA   NA   NA   NA   NA   NA
## [2,]   NA   NA   NA   NA   NA   NA   NA   NA
## [3,]   NA   NA   NA   NA   NA   NA   NA   NA
## [4,]   NA   NA   NA   NA   NA   NA   NA   NA
## [5,]   NA   NA   NA   NA   NA   NA   NA   NA

There are two problems here:

  1. mat is filled in the sequential version but won’t be in the parallel version. This is because when using parallelism, mat is copied so that each core modifies a copy of the matrix, not the original one.

  2. foreach returns something (here a two-level list).

To overcome this problem, you could use shared-memory. For example, with my package {bigstatsr}.

library(bigstatsr)
mat3 <- FBM(5, 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp3 <- foreach(j = 1:8, .combine = 'c') %:%
  foreach(i = 1:5, .combine = 'c') %dopar% {
    mat3[i, j] <- i + j
    NULL
  }
parallel::stopCluster(cl)
mat3[]
##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    2    3    4    5    6    7    8    9
## [2,]    3    4    5    6    7    8    9   10
## [3,]    4    5    6    7    8    9   10   11
## [4,]    5    6    7    8    9   10   11   12
## [5,]    6    7    8    9   10   11   12   13
tmp3
## NULL

The original matrix is now modified. Note that I return NULL to save memory.

Parallelize over a large matrix

mat <- matrix(0, 1e4, 1e4); mat[] <- rnorm(length(mat))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
  tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
    Sys.sleep(1)
    mat[1, 1]
  }
)
##    user  system elapsed 
##   1.931   0.323   4.517
parallel::stopCluster(cl)

If using clusters, copying mat to both clusters takes time (and memory!).

mat2 <- FBM(1e4, 1e4); mat2[] <- rnorm(length(mat2))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
  tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
    Sys.sleep(1)
    mat2[1, 1]
  }
)
##    user  system elapsed 
##   0.015   0.010   2.998
parallel::stopCluster(cl)

This is faster because it’s using a matrix that is stored on disk (so shared between processes) so that it doesn’t need to be copied.

Advanced parallelism: synchronization

For example, you may need to write to the same data (maybe increment it). In this case, it is important to use some locks so that only one session writes to the data at the same time. For that, you could use package {flock}, which is really easy to use.

mat <- FBM(1, 1, init = 0)
mat[]
## [1] 0
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
  mat[1, 1] <- mat[1, 1] + k
  NULL
}
## NULL
parallel::stopCluster(cl)
mat[]
## [1] 34
sum(1:10)
## [1] 55
lock <- tempfile()
mat2 <- FBM(1, 1, init = 0)
mat2[]
## [1] 0
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
  locked <- flock::lock(lock)
  mat2[1, 1] <- mat2[1, 1] + k
  flock::unlock(locked)
  NULL
}
## NULL
parallel::stopCluster(cl)
mat2[]
## [1] 55

So each process uses some lock to perform its incrementation so that the data can’t be changed by some other process in the meantime.

Moreover, you may also need to use some message passing or some barriers. For that, you could learn to use MPI. For some basic use, I “reimplemented” this using only shared-memory matrices (FBMs). You can see this function is you’re interested.

Miscellenaous

  • Recall that you won’t gain much from parallelism. You’re likely to gain much more performance by simply optimizing your sequential code. Don’t reproduce the silly examples here as real code, they are quite bad.

  • How to print during parallel execution? Use option outfile in makeCluster (for example, using outfile = "" will redirect to the console).

  • Don’t try to parallelize huge matrix operations with loops. There are already (parallel) optimized linear algebra libraries that exist and which will be much faster. For example, you could use Microsoft R Open.

  • Some will tell you to use parallel::detectCores() - 1 cores. I use bigstatsr::nb_cores().

Conclusion

Hope this can help some.

Don’t hesitate to comment if you want to add/modify something to this post.