%dofuture% - a Better foreach() Parallelization Operator than %dopar%
doFuture 1.0.0 is on CRAN since March 2023. It introduces a new
foreach operator %dofuture%
, which makes it even easier to
use foreach()
to parallelize via the future ecosystem. This
new operator is designed to be an alternative to the existing
%dopar%
operator for foreach()
- an alternative that works in
similar ways but better. If you already use foreach()
together
with futures, or plan on doing so, I recommend using %dofuture%
instead of %dopar%
. I’ll explain why I think so below.
Introduction
The traditional way to parallelize with foreach()
is to use the
%dopar%
infix operator together with a registered foreach adaptor.
The popular doParallel package provides %dopar%
backends for
parallelizing on the local machine. Here is an example that uses four
local workers:
library(foreach)
workers <- parallel::makeCluster(4)
doParallel::registerDoParallel(cl = workers)
xs <- rnorm(1000)
y <- foreach(x = xs, .export = "slow_fcn") %dopar% {
slow_fcn(x)
}
I highly suggest Futureverse for parallelization due to its
advantages, such as relaying standard output, messages, warnings, and
errors that were generated on the parallel workers in the main R
process, support for near-live progress updates, and more descriptive
backend error messages. Almost from the very beginning of the
Futureverse, you have been able to use futures with foreach()
and
%dopar%
via the doFuture package. For instance, we can rewrite
the above example to use futures as:
library(foreach)
doFuture::registerDoFuture()
future::plan(multisession, workers = 4)
xs <- rnorm(1000)
y <- foreach(x = xs, .export = "slow_fcn") %dopar% {
slow_fcn(x)
}
In this blog post, I am proposing to move to
library(foreach)
future::plan(multisession, workers = 4)
xs <- rnorm(1000)
y <- foreach(x = xs, .export = "slow_fcn") %dofuture% {
slow_fcn(x)
}
instead. So, why is that better? It is because:
%dofuture%
removes the need to register a foreach backend, i.e. no moreregisterDoMC()
,registerDoParallel()
,registerDoFuture()
, etc.%dofuture%
is unaffected by any foreach backends that the end-user has registered.%dofuture%
uses a consistentforeach()
“options” argument, regardless of parallel backend used, and not different ones for different backends, e.g..options.multicore
,.options.snow
, and.options.mpi
.%dofuture%
is guaranteed to always parallelizes via the Futureverse, using whateverplan()
the end-user has specified. It also means that you, as a developer, have full control of the parallelization code.%dofuture%
can generate proper parallel random number generation (RNG). There is no longer a need to use%dorng%
of the doRNG package.%dofuture%
automatically identifies global variables and packages that are needed by the parallel workers.%dofuture%
relays errors generated in parallel as-is such that they can be handled using standard R methods, e.g.tryCatch()
.%dofuture%
truly outputs standard output and messages, warnings, and other types of conditions generated in parallel as-is such that they can be handled using standard R methods, e.g.capture.output()
andwithCallingHandlers()
.%dofuture%
supports near-live progress updates via the progressr package.%dofuture%
gives more informative error messages, which helps troubleshooting, if a parallel worker crashes.
Below are the details.
Problems of %dopar%
that %dofuture%
addresses
Let me discuss a few of the unfortunate drawbacks that comes with
%dopar%
. Most of these stem from a slightly too lax
design. Although convenient, the flexible design prevents us from
having full control and writing code that can parallelize on any
parallel backend.
Problem 1. %dopar%
requires registering a foreach adaptor
If we write code that others will use, say, an R package, then we can
never know what compute resources the user has, or will have in the
future. Traditionally, this means that one user might want to use
doParallel for parallelization, another doMC, and yet another,
maybe, doRedis. Because of this, we must not have any calls to
one of the many registerDoNnn()
functions in our code. If we do, we
lock users into a specific parallel backend. We could of course
support a few different backends, but we are still locking users into
a small set of parallel backends. If someone develops a new backend
in the future, our code has to be updated before users can take
advantage the new backends.
One can argue that doFuture::registerDoFuture()
somewhat addresses
this problem. On one hand, when used, it does lock the user into the
future framework. On the other hand, the user has many parallel
backends to choose from in the Futureverse, including backends that
will be developed in the future. In this sense, the lock-in is less
severe, especially since we do not have to update our code for new
backends to be supported. Also, to avoid destructive side effects,
registerDoFuture()
allows you to change the foreach backend used
inside your functions temporarily, e.g.
## Temporarily use futures
oldDoPar <- registerDoFuture()
on.exit(with(oldDoPar, foreach::setDoPar(fun=fun, data=data, info=info)), add = TRUE)
This avoids changing the foreach backend that the user might already have set elsewhere.
That said, I never wanted to say that people should use
registerDoFuture()
whenever using %dopar%
, because I think that
would be against the philosophy behind the foreach framework. The
foreach ecosystem is designed to separate the foreach()
+
%dopar%
code, describing what to parallelize, from the
registerDoNnn()
call, describing how and where to parallelize.
Using %dofuture%
, instead of %dopar%
with user-controlled foreach
backend, avoids this dilemma. With %dofuture%
the developer is in
full control of the parallelization code.
Problem 2. Chunking and load-balancing differ among foreach backends
When using parallel map-reduce functions such as mclapply()
,
parLapply()
of the parallel package, or foreach()
with
%dopar%
, the tasks are partitioned into subsets and distributed to
the parallel workers for processing. This partitioning is often
referred to as “chunking”, because we chunk up the elements into
smaller chunks, and then each chunk is processed by one parallel
worker. There are different strategies to chunk up the elements. One
approach is to use uniformly sized chunks and have each worker process
one chunk. Another approach is to use chunks with a single element,
and have each worker process one or more chunks.
The chunks may be pre-assigned (“prescheduled”) to the parallel workers up-front, which is referred to as static load balancing. An alternative is to assign chunks to workers on-the-fly as the workers become available, which is referred to as dynamic load balancing.
If the processing time differ a lot between elements, it is beneficial to use dynamic load balancing together with small chunk sizes.
However, if we dig into the documentation and source code of the different foreach backends, we will find that they use different chunking and load-balancing strategies. For example, assume we are running on a Linux machine, which supports forked processing. Then, if we use
library(foreach)
doParallel::registerDoParallel(ncores = 8)
y <- foreach(x = X, .export = "slow_fcn") %dopar% {
slow_fcn(x)
}
the data will be processed by eight fork-based parallel workers using dynamic load balancing with single-element chunks. However, if we use PSOCK clusters:
library(foreach)
cl <- parallel::makeCluster(8)
doParallel::registerDoParallel(cl = cl)
y <- foreach(x = X, .export = "slow_fcn") %dopar% {
slow_fcn(x)
}
the data will be processed by eight PSOCK-based parallel workers using static load balancing with uniformly sized chunks.
Which of these two chunking and load-balancing strategies is the most
efficient one depends on how much the processing time of slow_fcn(x)
varies with different values of x
. For example, and without going
into details, if the processing times differ a lot, dynamic load
balancing often makes better use of the parallel workers and results
in a shorter overall processing time.
Regardless of which is faster, the problem with different foreach backends using different strategies is that, as a developer with little control over the registered foreach backend, you have equally poor control over the chunking and load-balancing strategies.
Using %dofuture%
, avoids this problem. If you use %dofuture%
, then
dynamic load balancing will always be used for processing the data,
regardless of which parallel future backend is in place, with the
option to control the chunk size. As a side note, %dopar%
with
registerDoFuture()
will also do this.
Problem 3. Different foreach backends use different foreach()
options
In the previous section, I did not mention that for some foreach
backends it is indeed possible to control whether static or dynamic
load balancing should be used, and what the chunk sizes should
be. This can be controlled by special .options.*
arguments for
foreach()
. However, each foreach backend has their own .options.*
argument, e.g. you might find that some use .options.multicore
,
others .options.snow
, or something else. Because they are
different, we cannot write code that works with any type of foreach
backend.
To give two examples, when using doParallel and
registerDoParallel(cores = 8)
, we can replace the default dynamic
load balancing with static load balancing as:
library(foreach)
doParallel::registerDoParallel(ncores = 8)
y <- foreach(x = X, .export = "slow_fcn",
.options.multicore = list(preschedule = TRUE)) %dopar% {
slow_fcn(x)
}
This change will also switch from chunks with a single element to (eight) chunks with similar size.
If we instead would use registerDoParallel(cl)
, which gives us the
vice versa situation, we can switch out the static load balancing with
dynamic load balancing by using:
library(foreach)
cl <- parallel::makeCluster(8)
doParallel::registerDoParallel(cl = cl)
y <- foreach(x = X, .export = "slow_fcn",
.options.snow = list(preschedule = FALSE)) %dopar% {
slow_fcn(x)
}
This will also switch from uniformly sized chunks to single-element chunks.
As we can see, the fact that we have to use different foreach()
“options” arguments (here .options.multicore
and .options.snow
)
for different foreach backends prevents us from writing code that
works with any foreach backend.
Of course, we could specify “options” arguments for known foreach backends and hope we haven’t missed any and that no new ones are showing up later, e.g.
library(foreach)
doParallel::registerDoParallel(cores = 8)
y <- foreach(x = X,
.export = "slow_fcn",
.options.multicore = list(preschedule = TRUE),
.options.snow = list(preschedule = TRUE),
.options.future = list(preschedule = TRUE),
.options.mpi = list(chunkSize = 1) ) %dopar% {
slow_fcn(x)
}
Regardlessly, this still limits the end-user to a set of commonly used foreach backends, and our code can never be agile to foreach backends that are developed at a later time.
Using %dofuture%
avoids these problems. It supports argument
.options.future
in a consistent way across all future backends,
which means that your code will be the same regardless of parallel
backend. By the core design of the Futureverse, any new future
backends developed later one will automatically work with your
foreach code if you use %dofuture%
.
Problem 4. Global variables are not always identified by foreach()
When parallelizing code, the parallel workers must have access to all
functions and variables required to evaluate the parallel code. As we
have seen the above examples, you can use the .export
argument to
help foreach()
to export the necessary objects to each of the
parallel workers.
However, a developer who uses doMC::registerDoMC()
, or equivalently
doParallel::registerDoParallel(cores)
, might forget to specify the
.export
argument. This can happen because the mechanisms of forked
processing makes all objects available to the parallel workers. If
they test their code using only these foreach backends, they will not
notice that .export
is not declared. The same may happen if the
developer assumes doFuture::registerDoFuture()
is used. However,
without specifying .export
, the code will not work on other types
of foreach backends, e.g. doParallel::registerDoParallel(cl)
and
doMPI::registerDoMPI()
. If an R package forgets to specify the
.export
argument, and is not comprehensively tested, then it will be
the end-user, for instance on MS Windows, that runs into the bug.
When using %dofuture%
, global variables and required packages are by
default automatically identified and exported to the parallel workers
by the future framework. This is done the same way regardless of
parallel backend.
Problem 5. Easy to forget parallel random number generation
The foreach package and %dopar%
do not have built-in support for
parallel random number generation (RNG). Statistical sound parallel
RNG is critical for many statistical analyses. If not done, then the
results can be biases and incorrect conclusions might be drawn. The
doRNG package comes to rescue when using %dopar%
. It provides
the operator %dorng%
, which will use %dopar%
internally while
automatically setting up parallel RNG. Whenever you use %dopar%
and
find yourself needing parallel RNG, I recommend to simply replace
%dopar%
with %dorng%
. The doRNG package also provides
registerDoRNG()
, which I do not recommend, because as a developer
you do not have full control whether that is registered or not.
Because foreach does not have built-in support for parallel RNG, it is easy to forget that it should be used. A developer who is aware of the importance of using proper parallel RNG will find out about doRNG and how to best use it, but a developer who is not aware of the problem, can easily miss it and publish an R package that produces potentially incorrect results.
However, when using the future framework will detect if we forget to
use parallel RNG. When this happens, a warning will alert us to the
problem and suggest how to fix it. This is the case if you use
doFuture::registerDoFuture()
, and it’s also the case when using
%dofuture%
. For example,
library(doFuture)
plan(multisession, workers = 3)
y <- foreach(ii = 1:4) %dofuture% {
runif(ii)
}
produces
Warning messages:
1: UNRELIABLE VALUE: Iteration 1 of the foreach() %dofuture% { ... },
part of chunk #1 ('doFuture2-1'), unexpectedly generated random numbers
without declaring so. There is a risk that those random numbers are not
statistically sound and the overall results might be invalid. To fix
this, specify foreach() argument '.options.future = list(seed = TRUE)'.
This ensures that proper, parallel-safe random numbers are produced via
the L'Ecuyer-CMRG method. To disable this check, set option
'doFuture.rng.onMisuse' to "ignore".
To fix this, we can specify foreach()
argument .options.future =
list(seed = TRUE)
to declare that we need to draw random number in
parallel, i.e.
library(doFuture)
plan(multisession, workers = 3)
y <- foreach(ii = 1:4, .options.future = list(seed = TRUE)) %dofuture% {
runif(ii)
}
This makes sure that statistical sound random numbers are generated.
Migrating from %dopar% to %dofuture% is straightforward
If you already have code that uses %dopar%
and want to start using
%dofuture%
instead, then it only takes are few changes, which are
all straightforward and quick:
Replace
%dopar%
with%dofuture%
.Replace
%dorng%
with%dofuture%
and set.options.future = list(seed = TRUE)
.Replace
.export = <character vector of global variables>
with.options.future = list(globals = <character vector of global variables>)
.Drop any other
registerDoNnn()
calls inside your function, if you use them.Update your documentation to mention that the parallel backend should be set using
future::plan()
and no longer via differentregisterDoNnn()
calls.
In brief, if you use %dofuture%
instead of `%dopar%, your life as a
developer will be easier and so will the end-user’s be too.
If you have questions or comments on doFuture and %dofuture%
, or
the Futureverse in general, please use the Futureverse Discussion
forum.
Happy futuring!
Henrik
Links
- doFuture package: CRAN, GitHub, pkgdown
- Futureverse: https://www.futureverse.org