Henrik Bengtsson
Parallel & distributed processing can be used to:
speed up processing (wall time)
lower memory footprint (per machine)
avoid data transfers (compute where data lives)
Other reasons, e.g. asynchronous UI/UX
X <- list(a = 1:50, b = 51:100, c = 101:150, d = 151:200)
y <- list()y$a <- sum(X$a)y$b <- sum(X$b)y$c <- sum(X$c)y$d <- sum(X$d)
y <- list()for (name in names(X)) y[[name]] <- sum(X[[name]])
y <- lapply(X, FUN = sum)
X <- list(a = 1:50, b = 51:100, c = 101:150, d = 151:200)y <- lapply(X, FUN = slow_sum) # 4 minutes
This can be parallelized on Unix & macOS (becomes non-parallel on Windows) as:
y <- parallel::mclapply(X, FUN = slow_sum, mc.cores = 4) # 1 minute
To parallelize also on Windows, we can do:
library(parallel)workers <- makeCluster(4)clusterExport(workers, "slow_sum")y <- parLapply(workers, X, fun = slow_sum) # 1 minute
"Which parallel API should I use?"
"What operating systems are users running?"
"It should work ... Oh, I forgot to test on macOS."
"Weird, others say it work for them but for me it doesn't!?"
"I wish this awesome package could parallelize on Windows :("
"I wish we could use a compute cluster in the cloud to speed this up"
#' @import parallelmy_fun <- function(X, ncores = 1) { if (ncores == 1) { y <- lapply(X, FUN = my_sum) } else { if (.Platform$OS.type == "windows") { workers <- makeCluster(ncores) on.exit(stopWorkers(workers)) clusterExport(workers, "slow_sum") y <- parLapply(workers, X, fun = slow_sum) } else { y <- mclapply(X, FUN = my_sum, mc.cores = ncores) } } y}
library(foreach)doMC::registerDoMC(4) # User chooses how to parallelizemy_fun <- function(X) { foreach(x = X) %dopar% { slow_sum(x) }}
workers <- parallel::makeCluster(4) # Alternative parallel backenddoParallel::registerDoParallel(workers)
Error in { : task 1 failed - 'could not find function "slow_sum"'
Whoops, we forgot to export slow_sum()
to background sessions;
foreach(x = X, .export = "slow_sum") %dopar% { slow_sum(x) }
v <- expr
f <- future(expr) v <- value(f)
> slow_sum(1:100) # 2 minutes[1] 5050
> a <- slow_sum(1:50) # 1 minute> b <- slow_sum(51:100) # 1 minute> a + b[1] 5050
> library(future)> plan(multiprocess)> fa <- future( slow_sum( 1:50 ) ) # ~0 seconds> fb <- future( slow_sum(51:100) ) # ~0 seconds> 1:3[1] 1 2 3> value(fa)[1] 1275> value(fb)[1] 3775> value(fa) + value(fb)[1] 5050
v <- expr
f <- future(expr) v <- value(f)
v %<-% expr
> library(future)> plan(multiprocess)> a %<-% slow_sum( 1:50 ) # ~0 seconds> b %<-% slow_sum(51:100) # ~0 seconds> 1:3[1] 1 2 3>
> library(future)> plan(multiprocess)> a %<-% slow_sum( 1:50 ) # ~0 seconds> b %<-% slow_sum(51:100) # ~0 seconds> 1:3[1] 1 2 3> a + b[1] 5050
plan(sequential)plan(multiprocess)plan(cluster, workers = c("n1", "n2", "n3"))plan(cluster, workers = c("remote1.org", "remote2.org"))...
> a %<-% slow_sum( 1:50 )> b %<-% slow_sum(51:100)> a + b[1] 5050
╔════════════════════════════════════════════════════════╗ ║ < Future API > ║ ║ ║ ║ future(), value(), %<-%, ... ║ ╠════════════════════════════════════════════════════════╣ ║ future ║ ╠════════════════════════════════╦═══════════╦═══════════╣ ║ parallel ║ globals ║ (listenv) ║ ╠══════════╦══════════╦══════════╬═══════════╬═══════════╝ ║ snow ║ Rmpi ║ nws ║ codetools ║ ╚══════════╩══════════╩══════════╩═══════════╝
The Future API encapsulates heterogeneity
Motto: Developer decides what to parallelize - user decides how to
Provides atomic building blocks:
f <- future(expr)
, v <- value(f)
, ...for richer parallel constructs, e.g. 'foreach', 'future.apply', ...
x <- rnorm(n = 100) pryr::ast( { slow_sum(x) } )f <- future({ slow_sum(x) }) | \- `{ \_____________/ | \- () |_____________| \- `slow_sum | \- `x
Globals identified and exported to background R worker:
slow_sum()
- a function (also searched recursively)
x
- a numeric vector of length 100
f <- future(expr) # create future r <- resolved(f) # check if done v <- value(f) # wait & get result
#' @import futureparallel_lapply <- function(X, fun, ...) { ## Create futures fs <- lapply(X, function(x) { future(fun(x, ...)) }) ## Collect their values lapply(fs, value)}
> plan(multiprocess)> X <- list(a = 1:50, b = 51:100, c = 101:150, d = 151:200)> y <- parallel_lapply(X, slow_sum)> str(y)List of 4 $ a: int 1275 $ b: int 3775 $ c: int 6275 $ d: int 8775
lapply()
, vapply()
, replicate()
, ...╔═══════════════════════════════════════════════════════════╗ ║ future_lapply(), future_vapply(), future_replicate(), ... ║ ╠═══════════════════════════════════════════════════════════╣ ║ < Future API > ║ ╠═══════════════════════════════════════════════════════════╣ ║ "wherever" ║ ╚═══════════════════════════════════════════════════════════╝
y <- lapply(X, slow_sum)
lapply()
, vapply()
, replicate()
, ...╔═══════════════════════════════════════════════════════════╗ ║ future_lapply(), future_vapply(), future_replicate(), ... ║ ╠═══════════════════════════════════════════════════════════╣ ║ < Future API > ║ ╠═══════════════════════════════════════════════════════════╣ ║ "wherever" ║ ╚═══════════════════════════════════════════════════════════╝
y <- future_lapply(X, slow_sum)
plan(multiprocess)
plan(cluster, workers = c("n1", "n2", "n3"))
plan(batchtools_sge)
map()
, map2()
, modify()
, ...╔═══════════════════════════════════════════════════════════╗ ║ future_map(), future_map2(), future_modify(), ... ║ ╠═══════════════════════════════════════════════════════════╣ ║ < Future API > ║ ╠═══════════════════════════════════════════════════════════╣ ║ "wherever" ║ ╚═══════════════════════════════════════════════════════════╝
y <- purrr::map(X, slow_sum)
map()
, map2()
, modify()
, ...╔═══════════════════════════════════════════════════════════╗ ║ future_map(), future_map2(), future_modify(), ... ║ ╠═══════════════════════════════════════════════════════════╣ ║ < Future API > ║ ╠═══════════════════════════════════════════════════════════╣ ║ "wherever" ║ ╚═══════════════════════════════════════════════════════════╝
y <- future_map(X, slow_sum)
╔═══════════════════════════════════════════════════════╗ ║ foreach API ║ ╠════════════╦══════╦════════╦═══════╦══════════════════╣ ║ doParallel ║ doMC ║ doSNOW ║ doMPI ║ doFuture ║ ╠════════════╩══╦═══╩════════╬═══════╬══════════════════╣ ║ parallel ║ snow ║ Rmpi ║ < Future API > ║ ╚═══════════════╩════════════╩═══════╬══════════════════╣ ║ "wherever" ║ ╚══════════════════╝
doFuture::registerDoFuture()plan(multiprocess)y <- foreach(x = X) %dopar% { slow_sum(x) }
┌───────────────────────────────────────────────────────┐ │ │ │ caret, gam, glmnet, plyr, ... (1,400 pkgs) │ │ │ ╠═══════════════════════════════════════════════════════╣ ║ foreach API ║ ╠══════╦════════════╦════════╦═══════╦══════════════════╣ ║ doMC ║ doParallel ║ doSNOW ║ doMPI ║ doFuture ║ ╠══════╩════════╦═══╩════════╬═══════╬══════════════════╣ ║ parallel ║ snow ║ Rmpi ║ < Future API > ║ ╚═══════════════╩════════════╩═══════╬══════════════════╣ ║ "wherever" ║ ╚══════════════════╝
doFuture::registerDoFuture()plan(future.batchtools::batchtools_sge)library(caret)model <- train(y ~ ., data = training)
## 2018-05-12> db <- utils::available.packages()> direct <- tools::dependsOnPkgs("foreach", recursive=FALSE, installed=db)> str(direct) chr [1:533] "adabag" "adamethods" "admixturegraph" "ADMM" ...> all <- tools::dependsOnPkgs("foreach", recursive=TRUE, installed=db)> str(all) chr [1:1363] "adabag" "adamethods" "admixturegraph" "ADMM" ...
## Find our 80 FASTQ filesfastq <- dir(pattern = "[.]fq$") ## 200 GB each => 16 TB total## Align the to human genomebam <- lapply(fastq, DNAseq::align) ## 3 hours each
Total processing time: 80 * 3 = 240 hours = 10 days
library(future.apply)plan(multiprocess) ## 12-core machine## Find our 80 FASTQ filesfastq <- dir(pattern = "[.]fq$") ## 200 GB each => 16 TB total## Align the to human genomebam <- future_lapply(fastq, DNAseq::align) ## 3 hours each
Total processing time: 80 * 3 / 12 = 20 hours
A common setup in many departments:
Attributes:
With too many nodes or users, ad-hoc clusters becomes cumbersome and hard to manage and control. Better to use a HPC scheduler with a job queue:
Two or more machines
Users submit jobs to a common job queue
The system takes jobs on the queue and executes them on available machines / cores
Attributes:
#! /bin/env bash#PBS -N my_htseq_align#PBS -l mem=12gbhtseq_align $1 human.fa
$ qsub htseq_align.pbs patient101.fq$ qsub htseq_align.pbs patient102.fq
$ qstatJob ID Name User Time Use S-------- ---------------- ------------ -------- -606411 bedGraph alice 46:22:22 R 606494 misosummary alice 55:07:08 R 606641 Rscript bob 37:18:30 R 607758 Exome_QS1_Som charlie 06:20:23 R 607832 my_htseq_align henrik 00:01:57 R 607833 my_htseq_align henrik - Q
╔═══════════════════════════════════════════════════╗ ║ < Future API > ║ ║ ║ ║ future(), resolved(), value(), %<-%, ... ║ ╠═══════════════════════════════════════════════════╣ ║ future <-> future.batchtools ║ ╠═════════════════════════╦═════════════════════════╣ ║ parallel ║ batchtools ║ ╚═════════════════════════╬═════════════════════════╣ ║ SGE, Slurm, TORQUE, ... ║ ╚═════════════════════════╝
library(future.batchtools)plan(batchtools_sge)fastq <- dir(pattern = "[.]fq$") ## 200 GB each; 80 filesbam <- future_lapply(fastq, DNAseq::align) ## 3 hours each
$ qstatJob ID Name User Time Use S-------- ---------------- ------------ -------- -606411 bedGraph alice 46:22:22 R606638 future05 henrik 01:32:05 R606641 Rscript bob 37:18:30 R606643 future06 henrik 01:31:55 R...
library(future)library(googleComputeEngineR)vms <- gce_vm_cluster(cluster_size = 36)plan(cluster, workers = as.cluster(vms))
data <- future_lapply(1:100, montecarlo_pi, B = 10e3)pi_hat <- Reduce(calculate_pi, data)print(pi_hat)## 3.14159
tasks <- drake_plan( raw_data = readxl::read_xlsx(file_in("raw-data.xlsx")), data = raw_data %>% mutate(Species = forcats::fct_inorder(Species)), hist = create_plot(data), fit = lm(Sepal.Width ~ Petal.Width + Species, data), report = rmarkdown::render(knitr_in("report.Rmd"), output_file = file_out("report.pdf")))make(tasks, parallelism = "future", jobs = 2)
Make future truly pure "containers" to be evaluated
Specify resource needs, e.g. only pass future to a worker:
Sandboxed future backend, e.g. evaluate non-trusted code
Unified API
Portable code
Worry-free
Developer decides what to parallelize - user decides how to
For beginners as well as advanced users
Nested parallelism on nested heterogeneous backends
Protects against recursive parallelism
Easy to build new frontends
Easy to add new backends
example()
:s
from foreach, NMF, TSP, glmnet, plyr, caret, etc.
(example link)fastq <- dir(pattern = "[.]fq$")aligned <- listenv()for (i in seq_along(fastq)) { aligned[[i]] %<-% { chrs <- listenv() for (j in 1:24) { chrs[[j]] %<-% DNAseq::align(fastq[i], chr = j) } merge_chromosomes(chrs) }}
plan(batchtools_sge)
plan(list(batchtools_sge, sequential))
plan(list(batchtools_sge, multiprocess))
By default all futures are resolved using eager evaluation, but the developer has the option to use lazy evaluation.
Explicit API:
f <- future(..., lazy = TRUE)v <- value(f)
Implicit API:
v %<-% { ... } %lazy% TRUE
Identification of globals from static-code inspection has limitations (but defaults cover a large number of use cases):
False negatives, e.g. my_fcn
is not found in do.call("my_fcn", x)
. Avoid by using do.call(my_fcn, x)
.
False positives - non-existing variables, e.g. NSE and variables in formulas. Ignore and leave it to run-time.
x <- "this FP will be exported"data <- data.frame(x = rnorm(1000), y = rnorm(1000))fit %<-% lm(x ~ y, data = data)
Comment: ... so, the above works.
Automatic (default):
x <- rnorm(n = 100)y <- future({ slow_sum(x) }, globals = TRUE)
By names:
y <- future({ slow_sum(x) }, globals = c("slow_sum", "x"))
As name-value pairs:
y <- future({ slow_sum(x) }, globals = list(slow_sum = slow_sum, x = rnorm(n = 100)))
Disable:
y <- future({ slow_sum(x) }, globals = FALSE)
Automatic (default):
x <- rnorm(n = 100)y %<-% { slow_sum(x) } %globals% TRUE
By names:
y %<-% { slow_sum(x) } %globals% c("slow_sum", "x")
As name-value pairs:
y %<-% { slow_sum(x) } %globals% list(slow_sum = slow_sum, x = rnorm(n = 100))
Disable:
y %<-% { slow_sum(x) } %globals% FALSE
x <- lapply(1:100, FUN = function(i) rnorm(1024 ^ 2))y <- list()for (i in seq_along(x)) { y[[i]] <- future( mean(x[[i]]) )}
gives error: "The total size of the 2 globals that need to be exported for the future expression ('mean(x[[i]])') is 800.00 MiB. This exceeds the maximum allowed size of 500.00 MiB (option 'future.globals.maxSize'). There are two globals: 'x' (800.00 MiB of class 'list') and 'i' (48 bytes of class 'numeric')."
for (i in seq_along(x)) { x_i <- x[[i]] ## Fix: subset before creating future y[[i]] <- future( mean(x_i) )}
Comment: Interesting research project to automate via code inspection.
Implicit futures are always resolved:
a %<-% sum(1:10)b %<-% { 2 * a }print(b)## [1] 110
Explicit futures require care by developer:
fa <- future( sum(1:10) )a <- value(fa)fb <- future( 2 * a )
For the lazy developer - not recommended (may be expensive):
options(future.globals.resolve = TRUE)fa <- future( sum(1:10) )fb <- future( 2 * value(fa) )
Future class and corresponding methods:
abstract S3 class with common parts implemented,
e.g.
globals and protection
new backends extend this class and implement core methods,
e.g. value()
and resolved()
built-in classes implement backends on top the parallel package
future | parallel | foreach | batchtools | BiocParallel | |
---|---|---|---|---|---|
Synchronous | ✓ | ✓ | ✓ | ✓ | ✓ |
Asynchronous | ✓ | ✓ | ✓ | ✓ | ✓ |
Uniform API | ✓ | ✓ | ✓ | ✓ | |
Extendable API | ✓ | ✓ | ✓ | ✓ | |
Globals | ✓ | (✓)+(soon by future) | |||
Packages | ✓ | ||||
Map-reduce ("lapply") | ✓ | ✓ | foreach() |
✓ | ✓ |
Load balancing | ✓ | ✓ | ✓ | ✓ | ✓ |
For loops | ✓ | ||||
While loops | ✓ | ||||
Nested config | ✓ | ||||
Recursive protection | ✓ | mc | mc | mc | mc |
RNG stream | ✓+ | ✓ | doRNG | (planned) | SNOW |
Early stopping | ✓ | ✓ | |||
Traceback | ✓ | ✓ |
availableCores()
is a "nicer" version of parallel::detectCores()
that returns the number of cores allotted to the process by acknowledging known settings, e.g.
getOption("mc.cores")
NSLOTS
, PBS_NUM_PPN
, SLURM_CPUS_PER_TASK
, ..._R_CHECK_LIMIT_CORES_
availableWorkers()
returns a vector of hostnames based on:
PE_HOSTFILE
, PBS_NODEFILE
, ...rep("localhost", availableCores())
Provide safe defaults to for instance
plan(multiprocess)plan(cluster)
future::makeClusterPSOCK()
:
Improves upon parallel::makePSOCKcluster()
Simplifies cluster setup, especially remote ones
Avoids common issues when workers connect back to master:
Makes option -l <user>
optional (such that ~/.ssh/config
is respected)
Automatically stop clusters when no longer needed, e.g. by garbage collector
Automatically stop workers if set up of cluster fails; parallel::makePSOCKcluster()
may leave background R processes behind
With 'future.batchtools' one can also specify computational resources, e.g. cores per node and memory needs.
plan(batchtools_sge, resources = list(mem = "128gb"))y %<-% { large_memory_method(x) }
Specific to scheduler: resources
is passed to the job-script template
where the parameters are interpreted and passed to the scheduler.
Each future needs one node with 24 cores and 128 GiB of RAM:
resources = list(l = "nodes=1:ppn=24", mem = "128gb")
nodes <- c("cauchy", "leibniz", "bolzano", "shannon", "euler", "hamming")plan(cluster, workers = nodes)## Find our 80 FASTQ filesfastq <- dir(pattern = "[.]fq$") ## 200 GB each## Align the to human genomebam <- listenv()for (i in seq_along(fastq)) { bam[[i]] %<-% DNAseq::align(fastq[i]) ## 3 hours each}
nodes <- c("cauchy", "leibniz", "bolzano", "shannon", "euler", "hamming")plan(cluster, workers = rep(nodes, each = 4))## Find our 80 FASTQ filesfastq <- dir(pattern = "[.]fq$") ## 200 GB each## Align the to human genomebam <- listenv()for (i in seq_along(fastq)) { bam[[i]] %<-% DNAseq::align(fastq[i]) ## 3 hours each}
E.g. one individual per machine then one chromosome per core:
plan(list(tweak(cluster, workers = nodes), multiprocess))
fastq <- dir(pattern = "[.]fq$")bam <- listenv()for (i in seq_along(fastq)) { ## One individual per worker bam[[i]] %<-% { chrs <- listenv() for (j in 1:24) { ## One chromosome per core chrs[[j]] %<-% DNAseq::align(fastq[i], chr = j) } merge_chromosomes(chrs) }}
> library(future)> plan(cluster, workers = "remote.org")
## Plot remotely> g %<-% R.devices::capturePlot({ filled.contour(volcano, color.palette = terrain.colors) title("volcano data: filled contour map") })
## Display locally> g
recordPlot()
+ replayPlot()
> plan(cluster, workers = "remote.org")> dat <- data.frame(+ x = rnorm(50e3),+ y = rnorm(50e3)+ )## Profile remotely> p %<-% profvis::profvis({+ plot(x ~ y, data = dat)+ m <- lm(x ~ y, data = dat)+ abline(m, col = "red")+ })
## Browse locally> p
"... framework for building web servers in R. ... from serving static
content to full-blown dynamic web-apps"
plan(multisession)mtcars %>% split(.$cyl) %>% map(~ future(lm(mpg ~ wt, data = .x))) %>% values %>% map(summary) %>% map_dbl("r.squared")## 4 6 8 ## 0.5086326 0.4645102 0.4229655
Comment: This approach not do load balancing. I have a few ideas how support for this may be implemented in future framework, which would be beneficial here and elsewhere.
For any type of futures, the develop may wish to control:
future(..., memory = 8e9)
remote = FALSE
dependencies = c("R (>= 3.5.0)", "rio"))
mounts = "/share/lab/files"
vars = c("gene_db", "mtcars")
container = "docker://rocker/r-base"
tokens = c("a", "b")
Risk for bloating the Future API: Which need to be included? Don't want to reinvent the HPC scheduler and Spark.
Parallel & distributed processing can be used to:
speed up processing (wall time)
lower memory footprint (per machine)
avoid data transfers (compute where data lives)
Other reasons, e.g. asynchronous UI/UX
Keyboard shortcuts
↑, ←, Pg Up, k | Go to previous slide |
↓, →, Pg Dn, Space, j | Go to next slide |
Home | Go to first slide |
End | Go to last slide |
b / m / f | Toggle blackout / mirrored / fullscreen mode |
c | Clone slideshow |
p | Toggle presenter mode |
t | Restart the presentation timer |
?, h | Toggle this help |
Esc | Back to slideshow |