Florian Privé

R(cpp) enthusiast

A guide to parallelism in R

Written on September 5, 2017

In this post, I will talk about parallelism in R. This post will likely be biased towards the solutions I use. For example, I never use mcapply nor clusterApply. I prefer to always use foreach. In this post, we will focus on how to parallelize R code on your computer.

I will use mainly silly examples, just to show one point at a time.

Basics of foreach

You can install the foreach package with install.packages("foreach").

library(foreach)

foreach(i = 1:3) %do% {
  sqrt(i)
}
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 1.414214
## 
## [[3]]
## [1] 1.732051

So, in the example above, you iterate on i and apply the expression sqrt(i). foreach returns a list by default. A common mistake is to think that foreach is like a for-loop. Actually, foreach is more like lapply.

lapply(1:3, function(i) {
  sqrt(i)
})
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 1.414214
## 
## [[3]]
## [1] 1.732051

Parameter .combine is very useful.

foreach(i = 1:3, .combine = 'c') %do% {
  sqrt(i)
}
## [1] 1.000000 1.414214 1.732051

With lapply, we would do

res <- lapply(1:3, function(i) {
  sqrt(i)
})
do.call('c', res)
## [1] 1.000000 1.414214 1.732051

Parallelize with foreach

You need to do at least two things:

  • replace %do% by %dopar%. Basically, always use %dopar% because you can use registerDoSEQ() is you really want to run the foreach sequentially.
  • register a parallel backend using one of the packages that begin with do (such as doParallel, doMC, doMPI and more). I will list only the two main parallel backends because there are too many of them.

Using clusters

# Example registering clusters
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
  sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
parallel::stopCluster(cl)

In this situation, all the data and packages used must be exported (copied) to the clusters, which can add some overhead. Yet, at least, you know what you do.

Using forking

cl <- parallel::makeForkCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
  sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
parallel::stopCluster(cl)

Forking just copy the R session at its current state. This is very fast because it copies objects only it they are modified. Moreover, you don’t need to export variables nor packages because they are already in the session. However, this can’t be used on Windows. This is why I use the clusters option in my packages. Also, I don’t trust forking to always work.

Common problems/mistakes/questions

Exporting variables and packages

object xxx not found or function xxx not found.

# Some data and function
library(dplyr)
dfs <- rep(list(iris), 3)
count(dfs[[1]], Species)
## # A tibble: 3 x 2
##      Species     n
##       <fctr> <int>
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50
# Sequential processing to apply to 
# all the data frames of the list 'dfs'
registerDoSEQ()
myFun <- function() {
  foreach(i = seq_along(dfs)) %dopar% {
    df <- dfs[[i]]
    count(df, Species)
  }
}
str(myFun())
## List of 3
##  $ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..$ n      : int [1:3] 50 50 50
##  $ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..$ n      : int [1:3] 50 50 50
##  $ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..$ n      : int [1:3] 50 50 50
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun(), error = function(e) print(e))
## <simpleError in {    df <- dfs[[i]]    count(df, Species)}: task 1 failed - "objet 'dfs' introuvable">
parallel::stopCluster(cl)

Hey, why doesn’t this work anymore? foreach will export all the needed variables that are present in its environment (here, the environment of myFun) and dfs is not in this environment. Some will tell you to use option .export of foreach but I don’t think it’s good practice. You just have to pass dfs to myFun.

myFun2 <- function(dfs) {
  foreach(i = seq_along(dfs)) %dopar% {
    df <- dfs[[i]]
    count(df, Species)
  }
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun2(dfs), error = function(e) print(e))
## <simpleError in {    df <- dfs[[i]]    count(df, Species)}: task 1 failed - "impossible de trouver la fonction "count"">
parallel::stopCluster(cl)

Hey, this still doesn’t work. You also need to load packages. You could use option .packages of foreach but you could simply add dplyr:: before count. It will be clearer (like one does in packages).

myFun3 <- function(dfs) {
  foreach(i = seq_along(dfs)) %dopar% {
    df <- dfs[[i]]
    dplyr::count(df, Species)
  }
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun3(dfs), error = function(e) print(e))
## [[1]]
## # A tibble: 3 x 2
##      Species     n
##       <fctr> <int>
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50
## 
## [[2]]
## # A tibble: 3 x 2
##      Species     n
##       <fctr> <int>
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50
## 
## [[3]]
## # A tibble: 3 x 2
##      Species     n
##       <fctr> <int>
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50
parallel::stopCluster(cl)

Iterate over lots of elements.

cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
  foreach(i = seq_len(2e4), .combine = 'c') %dopar% {
    sqrt(i)
  }
)
##    user  system elapsed 
##   6.144   0.329   6.646
parallel::stopCluster(cl)

Iterating over multiple elements in R is bad for performance. Moreover, foreach is only combining results 100 by 100, which also slows computations.

If there are too many elements to loop over, the best is to split the computation in ncores blocks and to perform some optimized sequential work on each block. In package bigstatsr, I use the following function to split indices in nb groups because I often need to iterate over hundreds of thousands of elements (columns).

bigstatsr:::CutBySize
## function (m, block.size, nb = ceiling(m/block.size)) 
## {
##     if (nb > m) 
##         nb <- m
##     int <- m/nb
##     upper <- round(1:nb * int)
##     lower <- c(1, upper[-nb] + 1)
##     size <- c(upper[1], diff(upper))
##     cbind(lower, upper, size)
## }
## <bytecode: 0xc66c820>
## <environment: namespace:bigstatsr>
bigstatsr:::CutBySize(20, nb = 3)
##      lower upper size
## [1,]     1     7    7
## [2,]     8    13    6
## [3,]    14    20    7

Filling something in parallel

Using foreach loop in r returning NA

mat <- matrix(0, 5, 8)
registerDoSEQ()
# Nested foreach loop
tmp <- foreach(j = 1:8) %:%
  foreach(i = 1:5) %dopar% {
    mat[i, j] <- i + j
  }
mat
##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    2    3    4    5    6    7    8    9
## [2,]    3    4    5    6    7    8    9   10
## [3,]    4    5    6    7    8    9   10   11
## [4,]    5    6    7    8    9   10   11   12
## [5,]    6    7    8    9   10   11   12   13
# Try in parallel
mat2 <- matrix(0, 5, 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp2 <- foreach(j = 1:8) %:%
  foreach(i = 1:5) %dopar% {
    mat2[i, j] <- i + j
  }
parallel::stopCluster(cl)
mat2
##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    0    0    0    0    0    0    0    0
## [2,]    0    0    0    0    0    0    0    0
## [3,]    0    0    0    0    0    0    0    0
## [4,]    0    0    0    0    0    0    0    0
## [5,]    0    0    0    0    0    0    0    0

There are two problems here:

  1. mat is filled in the sequential version but won’t be in the parallel version. Why? This is because when using parallelism, mat is copied so that each core modifies a copy of the matrix, not the original one.

  2. foreach returns something (here a two-level list).

To overcome this problem, you could use shared-memory. For example, with my package bigstatsr.

library(bigstatsr)
mat3 <- FBM(5, 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp3 <- foreach(j = 1:8, .combine = 'c') %:%
  foreach(i = 1:5, .combine = 'c') %dopar% {
    mat3[i, j] <- i + j
    NULL
  }
parallel::stopCluster(cl)
mat3[]
##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    2    3    4    5    6    7    8    9
## [2,]    3    4    5    6    7    8    9   10
## [3,]    4    5    6    7    8    9   10   11
## [4,]    5    6    7    8    9   10   11   12
## [5,]    6    7    8    9   10   11   12   13
tmp3
## NULL

The original matrix is now modified. Note that I return NULL to save memory.

Parallelize over a large matrix

mat <- matrix(0, 1e4, 1e4); mat[] <- rnorm(length(mat))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
  tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
    Sys.sleep(1)
    mat[1, 1]
  }
)
##    user  system elapsed 
##   1.834   0.262   3.806
parallel::stopCluster(cl)

Copying mat to both clusters takes time (and memory!).

mat2 <- FBM(1e4, 1e4); mat2[] <- rnorm(length(mat2))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
  tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
    Sys.sleep(1)
    mat2[1, 1]
  }
)
##    user  system elapsed 
##   0.025   0.005   2.410
parallel::stopCluster(cl)

This is faster because it’s using a matrix that is stored on disk (so shared between processes) so that it doesn’t need to be copied.

Advanced parallelism: synchronization

For example, you may need to write to the same data (maybe increment it). In this case, it is important to use some locks so that only one session writes to the data at the same time. For that, you could use package flock, which is really easy to use.

mat <- FBM(1, 1, init = 0)
mat[]
## [1] 0
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
  mat[1, 1] <- mat[1, 1] + k
  NULL
}
## NULL
parallel::stopCluster(cl)
mat[]
## [1] 33
sum(1:10)
## [1] 55
lock <- tempfile()
mat2 <- FBM(1, 1, init = 0)
mat2[]
## [1] 0
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
  locked <- flock::lock(lock)
  mat2[1, 1] <- mat2[1, 1] + k
  flock::unlock(locked)
  NULL
}
## NULL
parallel::stopCluster(cl)
mat2[]
## [1] 55

So each process uses some lock to perform its incrementation so that the data can’t be changed by some other process in the meantime.

You may also need to use some message passing or some barriers. For that, you could learn to use MPI. For some basic use, I “reimplemented” this using only shared-memory matrices (FBMs). You can see this function is you’re interested.

Miscellenaous

  • Recall that you won’t gain much from parallelism. You’re likely to gain much more performance by simply optimizing your sequential code. Don’t reproduce the silly examples here as real code, they are quite bad.

  • How to print during parallel execution? Use option outfile in makeCluster (for example, using outfile = "" will redirect to the console).

  • Don’t try to parallelize huge matrix operations with loops. There are already (parallel) optimized linear algebra libraries that exist and which will be much faster. For example, you could use Microsoft R Open.

  • Some will tell you to use parallel::detectCores() - 1 cores.

Conclusion

Hope this can help some.

Don’t hesitate to comment if you want to add/modify something to this post.