A guide to parallelism in R
In this post, I talk about parallelism in R. This post is likely biased towards the solutions I use. For example, I never use mcapply
nor clusterApply
; I prefer to always use foreach
. In this post, we will focus on how to parallelize R code on your computer with package {foreach}.
In this post, I use mainly silly examples just to show one point at a time.
Basics of foreach
You can install R package {foreach} with install.packages("foreach")
.
library(foreach)
foreach(i = 1:3) %do% {
sqrt(i)
}
## [[1]]
## [1] 1
##
## [[2]]
## [1] 1.414214
##
## [[3]]
## [1] 1.732051
In the example above, you iterate on i
and apply the expression sqrt(i)
. Function foreach
returns a list by default. A common mistake is to think that foreach
is like a for-loop. Actually, foreach
is more like lapply
.
lapply(1:3, function(i) {
sqrt(i)
})
## [[1]]
## [1] 1
##
## [[2]]
## [1] 1.414214
##
## [[3]]
## [1] 1.732051
Parameter .combine
can be very useful. Yet, now, I usually prefer to combine the results afterwards (see do.call
below).
foreach(i = 1:3, .combine = 'c') %do% {
sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
With lapply
, we would do
res <- lapply(1:3, function(i) {
sqrt(i)
})
do.call('c', res)
## [1] 1.000000 1.414214 1.732051
Parallelize with foreach
You need to do at least two things:
replace
%do%
by%dopar%
. Basically, always use%dopar%
because you can useregisterDoSEQ()
is you really want to run theforeach
sequentially.register a parallel backend using one of the packages that begin with do (such as
doParallel
,doMC
,doMPI
and more). I will list only the two main parallel backends because there are too many of them.
Using clusters
# Example registering clusters
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
parallel::stopCluster(cl)
In this situation, all the data and packages used must be exported (copied) to the clusters, which can add some overhead. Yet, at least, you know what you do.
Using forking
cl <- parallel::makeForkCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
sqrt(i)
}
## [1] 1.000000 1.414214 1.732051
parallel::stopCluster(cl)
Forking just copy the R session in its current state. This is very fast because it copies objects only it they are modified. Moreover, you don’t need to export variables nor packages because they are already in the session. However, this can’t be used on Windows. This is why I use the clusters option in my packages.
Common problems/mistakes/questions
Exporting variables and packages
“object”xxx" not found" or “could not find function”xxx“”.
# Some data and function
library(dplyr)
dfs <- rep(list(iris), 3)
count(dfs[[1]], Species)
## # A tibble: 3 x 2
## Species n
## <fct> <int>
## 1 setosa 50
## 2 versicolor 50
## 3 virginica 50
# Sequential processing to apply to
# all the data frames of the list 'dfs'
registerDoSEQ()
myFun <- function() {
foreach(i = seq_along(dfs)) %dopar% {
df <- dfs[[i]]
count(df, Species)
}
}
str(myFun())
## List of 3
## $ :Classes 'tbl_df', 'tbl' and 'data.frame': 3 obs. of 2 variables:
## ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
## ..$ n : int [1:3] 50 50 50
## $ :Classes 'tbl_df', 'tbl' and 'data.frame': 3 obs. of 2 variables:
## ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
## ..$ n : int [1:3] 50 50 50
## $ :Classes 'tbl_df', 'tbl' and 'data.frame': 3 obs. of 2 variables:
## ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
## ..$ n : int [1:3] 50 50 50
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun(), error = function(e) print(e))
## <simpleError in { df <- dfs[[i]] count(df, Species)}: task 1 failed - "objet 'dfs' introuvable">
parallel::stopCluster(cl)
Why doesn’t this work anymore? foreach
will export all the needed variables that are present in its environment (here, the environment of myFun
) and dfs
is not in this environment. Some will tell you to use option .export
of foreach
but I don’t think it’s good practice. You just have to pass dfs
to myFun
.
myFun2 <- function(dfs) {
foreach(i = seq_along(dfs)) %dopar% {
df <- dfs[[i]]
count(df, Species)
}
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun2(dfs), error = function(e) print(e))
## <simpleError in { df <- dfs[[i]] count(df, Species)}: task 1 failed - "impossible de trouver la fonction "count"">
parallel::stopCluster(cl)
This still doesn’t work. You also need to load packages. You could use option .packages
of foreach
but you could simply add dplyr::
before count
. Moreover, it is clearer (like one does in packages).
myFun3 <- function(dfs) {
foreach(i = seq_along(dfs)) %dopar% {
df <- dfs[[i]]
dplyr::count(df, Species)
}
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun3(dfs), error = function(e) print(e))
## [[1]]
## # A tibble: 3 x 2
## Species n
## <fct> <int>
## 1 setosa 50
## 2 versicolor 50
## 3 virginica 50
##
## [[2]]
## # A tibble: 3 x 2
## Species n
## <fct> <int>
## 1 setosa 50
## 2 versicolor 50
## 3 virginica 50
##
## [[3]]
## # A tibble: 3 x 2
## Species n
## <fct> <int>
## 1 setosa 50
## 2 versicolor 50
## 3 virginica 50
parallel::stopCluster(cl)
Iterate over lots of elements.
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
foreach(i = seq_len(2e4), .combine = 'c') %dopar% {
sqrt(i)
}
)
## user system elapsed
## 6.681 0.479 7.611
parallel::stopCluster(cl)
Iterating over multiple elements in R is bad for performance. Moreover, foreach
is only combining results 100 by 100, which also slows computations.
If there are too many elements to loop over, the best is to split the computation in ncores blocks and to perform some optimized sequential work on each block. In package {bigstatsr}, I use the following function to split indices in nb
groups because I often need to iterate over hundreds of thousands of elements (columns).
bigstatsr:::CutBySize
## function (m, block.size, nb = ceiling(m/block.size))
## {
## if (nb > m)
## nb <- m
## int <- m/nb
## upper <- round(1:nb * int)
## lower <- c(1, upper[-nb] + 1)
## size <- c(upper[1], diff(upper))
## cbind(lower, upper, size)
## }
## <bytecode: 0xad02c80>
## <environment: namespace:bigstatsr>
bigstatsr:::CutBySize(20, nb = 3)
## lower upper size
## [1,] 1 7 7
## [2,] 8 13 6
## [3,] 14 20 7
Filling something in parallel
Using foreach loop in R returning NA
mat <- matrix(nrow = 5, ncol = 8)
registerDoSEQ()
# Nested foreach loop
tmp <- foreach(j = 1:8) %:% foreach(i = 1:5) %dopar% {
mat[i, j] <- i + j
}
mat
## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,] 2 3 4 5 6 7 8 9
## [2,] 3 4 5 6 7 8 9 10
## [3,] 4 5 6 7 8 9 10 11
## [4,] 5 6 7 8 9 10 11 12
## [5,] 6 7 8 9 10 11 12 13
# Try in parallel
mat2 <- matrix(nrow = 5, ncol = 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp2 <- foreach(j = 1:8) %:% foreach(i = 1:5) %dopar% {
mat2[i, j] <- i + j
}
parallel::stopCluster(cl)
mat2
## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,] NA NA NA NA NA NA NA NA
## [2,] NA NA NA NA NA NA NA NA
## [3,] NA NA NA NA NA NA NA NA
## [4,] NA NA NA NA NA NA NA NA
## [5,] NA NA NA NA NA NA NA NA
There are two problems here:
mat
is filled in the sequential version but won’t be in the parallel version. This is because when using parallelism,mat
is copied so that each core modifies a copy of the matrix, not the original one.foreach
returns something (here a two-level list).
To overcome this problem, you could use shared-memory. For example, with my package {bigstatsr}.
library(bigstatsr)
mat3 <- FBM(5, 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp3 <- foreach(j = 1:8, .combine = 'c') %:%
foreach(i = 1:5, .combine = 'c') %dopar% {
mat3[i, j] <- i + j
NULL
}
parallel::stopCluster(cl)
mat3[]
## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,] 2 3 4 5 6 7 8 9
## [2,] 3 4 5 6 7 8 9 10
## [3,] 4 5 6 7 8 9 10 11
## [4,] 5 6 7 8 9 10 11 12
## [5,] 6 7 8 9 10 11 12 13
tmp3
## NULL
The original matrix is now modified. Note that I return NULL
to save memory.
Parallelize over a large matrix
mat <- matrix(0, 1e4, 1e4); mat[] <- rnorm(length(mat))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
Sys.sleep(1)
mat[1, 1]
}
)
## user system elapsed
## 1.931 0.323 4.517
parallel::stopCluster(cl)
If using clusters, copying mat
to both clusters takes time (and memory!).
mat2 <- FBM(1e4, 1e4); mat2[] <- rnorm(length(mat2))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
Sys.sleep(1)
mat2[1, 1]
}
)
## user system elapsed
## 0.015 0.010 2.998
parallel::stopCluster(cl)
This is faster because it’s using a matrix that is stored on disk (so shared between processes) so that it doesn’t need to be copied.
Advanced parallelism: synchronization
For example, you may need to write to the same data (maybe increment it). In this case, it is important to use some locks so that only one session writes to the data at the same time. For that, you could use package {flock}, which is really easy to use.
mat <- FBM(1, 1, init = 0)
mat[]
## [1] 0
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
mat[1, 1] <- mat[1, 1] + k
NULL
}
## NULL
parallel::stopCluster(cl)
mat[]
## [1] 34
sum(1:10)
## [1] 55
lock <- tempfile()
mat2 <- FBM(1, 1, init = 0)
mat2[]
## [1] 0
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
locked <- flock::lock(lock)
mat2[1, 1] <- mat2[1, 1] + k
flock::unlock(locked)
NULL
}
## NULL
parallel::stopCluster(cl)
mat2[]
## [1] 55
So each process uses some lock to perform its incrementation so that the data can’t be changed by some other process in the meantime.
Moreover, you may also need to use some message passing or some barriers. For that, you could learn to use MPI. For some basic use, I “reimplemented” this using only shared-memory matrices (FBMs). You can see this function is you’re interested.
Miscellenaous
Recall that you won’t gain much from parallelism. You’re likely to gain much more performance by simply optimizing your sequential code. Don’t reproduce the silly examples here as real code, they are quite bad.
How to print during parallel execution? Use option
outfile
inmakeCluster
(for example, usingoutfile = ""
will redirect to the console).Don’t try to parallelize huge matrix operations with loops. There are already (parallel) optimized linear algebra libraries that exist and which will be much faster. For example, you could use Microsoft R Open.
Some will tell you to use
parallel::detectCores() - 1
cores. I usebigstatsr::nb_cores()
.
Conclusion
Hope this can help some.
Don’t hesitate to comment if you want to add/modify something to this post.