* Futurized version of base R's `lapply()`, `vapply()`, `replicate()`, ...
* ... on **all future-compatible backends**
* Load balancing ("chunking")
* Proper parallel random number generation
╔═══════════════════════════════════════════════════════════╗
║ future_lapply(), future_vapply(), future_replicate(), ... ║
╠═══════════════════════════════════════════════════════════╣
║ < Future API > ║
╠═══════════════════════════════════════════════════════════╣
║ "wherever" ║
╚═══════════════════════════════════════════════════════════╝
---
```r
y <- lapply(X, slow_sum)
```
---
```r
y <- future_lapply(X, slow_sum)
```
--
* `plan(multiprocess)`
* `plan(cluster, workers = c("n1", "n2", "n3"))`
* `plan(batchtools_sge)`
---
class: huge
layout: true
# Frontend: furrr (Davis Vaughan)
* Futurized version of purrrs R's `map()`, `map2()`, `modify()`, ...
* ... on **all future-compatible backends**
╔═══════════════════════════════════════════════════════════╗
║ future_map(), future_map2(), future_modify(), ... ║
╠═══════════════════════════════════════════════════════════╣
║ < Future API > ║
╠═══════════════════════════════════════════════════════════╣
║ "wherever" ║
╚═══════════════════════════════════════════════════════════╝
---
```r
y <- purrr::map(X, slow_sum)
```
---
```r
y <- future_map(X, slow_sum)
```
---
class: huge
layout: false
# Frontend: doFuture
* A **foreach** adapter on top of the Future API
* Foreach on **all future-compatible backends**
╔═══════════════════════════════════════════════════════╗
║ foreach API ║
╠════════════╦══════╦════════╦═══════╦══════════════════╣
║ doParallel ║ doMC ║ doSNOW ║ doMPI ║ doFuture ║
╠════════════╩══╦═══╩════════╬═══════╬══════════════════╣
║ parallel ║ snow ║ Rmpi ║ < Future API > ║
╚═══════════════╩════════════╩═══════╬══════════════════╣
║ "wherever" ║
╚══════════════════╝
```r
doFuture::registerDoFuture()
plan(batchtools_sge)
y <- foreach(x = X) %dopar% { slow_sum(x) }
```
---
class: huge
# ~1,400 packages can now parallelize on HPC
┌───────────────────────────────────────────────────────┐
│ │
│ caret, gam, glmnet, plyr, ... (1,400 pkgs) │
│ │
╠═══════════════════════════════════════════════════════╣
║ foreach API ║
╠══════╦════════════╦════════╦═══════╦══════════════════╣
║ doMC ║ doParallel ║ doSNOW ║ doMPI ║ doFuture ║
╠══════╩════════╦═══╩════════╬═══════╬══════════════════╣
║ parallel ║ snow ║ Rmpi ║ < Future API > ║
╚═══════════════╩════════════╩═══════╬══════════════════╣
║ "wherever" ║
╚══════════════════╝
```r
doFuture::registerDoFuture()
plan(future.batchtools::batchtools_sge)
library(caret)
model <- train(y ~ ., data = training)
```
???
```r
## 2018-05-12
> db <- utils::available.packages()
> direct <- tools::dependsOnPkgs("foreach", recursive=FALSE, installed=db)
> str(direct)
chr [1:533] "adabag" "adamethods" "admixturegraph" "ADMM" ...
> all <- tools::dependsOnPkgs("foreach", recursive=TRUE, installed=db)
> str(all)
chr [1:1363] "adabag" "adamethods" "admixturegraph" "ADMM" ...
```
---
count: false
# High Performance Compute (HPC) clusters
---
class: huge
# Example: Genome sequencing project
* Sequencing of a human DNA (3 * 109 nucleotides)
* 80 individuals
* Millions of short raw sequences need to be mapped to the human reference
* Alignment takes ~3 hours per individual
* Raw sequence data is ~200 GB per individual
--
```r
## Find our 80 FASTQ files
fastq <- dir(pattern = "[.]fq$") ## 200 GB each => 16 TB total
## Align the to human genome
bam <- lapply(fastq, DNAseq::align) ## 3 hours each
```
--
**Total processing time: 80 * 3 = 240 hours = 10 days**
---
class: huge
# Example: Genome sequencing project
* Sequencing of a human DNA (3 * 109 nucleotides)
* 80 individuals
* Millions of short raw sequences need to be mapped to the human reference
* Alignment takes ~3 hours per individual
* Raw sequence data is ~200 GB per individual
```r
library(future.apply)
plan(multiprocess) ## 12-core machine
## Find our 80 FASTQ files
fastq <- dir(pattern = "[.]fq$") ## 200 GB each => 16 TB total
## Align the to human genome
bam <- future_lapply(fastq, DNAseq::align) ## 3 hours each
```
**Total processing time: 80 * 3 / 12 = 20 hours**
---
class: huge
# Ad-Hoc Compute Clusters
A common setup in many departments:
* Two or more machines
* Manually SSH into each machine to launch scripts
Attributes:
* Works ok with a few people and fair usage
* Can easily be overloaded if too many users
* Hard to plan your jobs
---
class: huge
# Clusters with Job Queues
With too many nodes or users, ad-hoc clusters
becomes cumbersome and hard to manage and control.
**Better to use a HPC scheduler with a job queue**:
* Two or more machines
* Users submit jobs to a common job queue
* The system takes jobs on the queue and
executes them on available machines / cores
Attributes:
* Works well with any number of users and machines
* Users do not have to worry about overloading the cluster;
the cluster will wait to process the next job if all
compute resources are busy running jobs
---
class: Large
# Example: Submit a job & watch the queue
```sh
#! /bin/env bash
#PBS -N my_htseq_align
#PBS -l mem=12gb
htseq_align $1 human.fa
```
--
```sh
$ qsub htseq_align.pbs patient101.fq
$ qsub htseq_align.pbs patient102.fq
```
--
```sh
$ qstat
Job ID Name User Time Use S
-------- ---------------- ------------ -------- -
606411 bedGraph alice 46:22:22 R
606494 misosummary alice 55:07:08 R
606641 Rscript bob 37:18:30 R
607758 Exome_QS1_Som charlie 06:20:23 R
607832 my_htseq_align henrik 00:01:57 R
607833 my_htseq_align henrik - Q
```
---
class: huge
# Backend: future.batchtools
* **batchtools**: Map-Reduce API for **HPC schedulers**,
e.g. LSF, OpenLava, SGE, Slurm, and TORQUE / PBS
* **future.batchtools**: **Future API** on top of **batchtools**
╔═══════════════════════════════════════════════════╗
║ < Future API > ║
║ ║
║ future(), resolved(), value(), %<-%, ... ║
╠═══════════════════════════════════════════════════╣
║ future <-> future.batchtools ║
╠═════════════════════════╦═════════════════════════╣
║ parallel ║ batchtools ║
╚═════════════════════════╬═════════════════════════╣
║ SGE, Slurm, TORQUE, ... ║
╚═════════════════════════╝
---
class: huge
# Backend: future.batchtools
```r
library(future.batchtools)
plan(batchtools_sge)
fastq <- dir(pattern = "[.]fq$") ## 200 GB each; 80 files
bam <- future_lapply(fastq, DNAseq::align) ## 3 hours each
```
--
```sh
$ qstat
Job ID Name User Time Use S
-------- ---------------- ------------ -------- -
606411 bedGraph alice 46:22:22 R
606638 future05 henrik 01:32:05 R
606641 Rscript bob 37:18:30 R
606643 future06 henrik 01:31:55 R
...
```
---
class: huge
count: false
# Backend: Google Cloud Engine Cluster (Mark Edmondson)
```r
library(googleComputeEngineR)
vms <- lapply(paste0("node", 1:10),
FUN = gce_vm, template = "r-base")
cl <- as.cluster(lapply(vms, FUN = gce_ssh_setup),
docker_image = "henrikbengtsson/r-parallel")
plan(cluster, workers = cl)
```
--
```r
data <- future_lapply(1:100, montecarlo_pi, B = 10e3)
pi_hat <- Reduce(calculate_pi, data)
print(pi_hat)
## 3.14159
```
---
count: false
layout: false
class: Huge
# Futures in the Wild ...
---
class: huge
# _drake_ - A Workflow Manager (Will Landau & rOpenSci)
```r
tasks <- drake_plan(
raw_data = readxl::read_xlsx(file_in("raw-data.xlsx")),
data = raw_data %>% mutate(Species =
forcats::fct_inorder(Species)) %>% select(-X__1),
hist = ggplot(data, aes(x = Petal.Width, fill = Species))
+ geom_histogram(),
fit = lm(Sepal.Width ~ Petal.Width + Species, data),
rmarkdown::render(knitr_in("report.Rmd"),
output_file = file_out("report.pdf"))
)
future::plan("multiprocess")
make(tasks, parallelism = "future")
```
---
---
class: huge
# _shiny_ - Asynchronous UI (RStudio)
```r
library(shiny)
future::plan("multiprocess")
...
```
---
count: false
layout: false
class: Huge
# The Near Future ...
---
class: huge
# Improvements
* ☑ Capture and relay standard output
* ☑ Capture and relay messages and warnings
* ☑ Improved error reporting and tracebacks (more can be done)
* ☐ [50%] Benchmarking (time and memory)
* ☐ [20%] Hook functions, e.g. update progress bar when one future is resolved
* ☐ [20%] Killing futures
* ☐ [10%] Restarting failed futures
* ☐ [80%] **future.tests** - Unified test framework for all future backends
- All parallel backends must support the Core Future API
- Sponsored via an R Consortium grant
## foreach
* ☐ [80%] HARMONIZATION: Identify globals using **future**
---
class: Huge
# Roadmap
* Make future **truly pure "containers"** to be evaluated
- One of the original design goals
- Support for passing futures "asis" to wherever and whenever
* Specify **resource needs**, e.g. only pass future to a worker:
- ... on the same file system
- ... that has a GPU
* **Sandboxed** future backend, e.g. evaluate non-trusted code
---
class: huge
# Summary of features
* **Unified API**
* **Portable code**
* **Worry-free**
* **Developer decides what to parallelize - user decides how to**
* For beginners as well as advanced users
* Nested parallelism on nested heterogeneous backends
* Protects against recursive parallelism
* Easy to build new frontends
* Easy to add new backends
---
class: huge
# Building a better future
I 💜 feedback,
bug reports,
and suggestions
@HenrikBengtsson
HenrikBengtsson/future
jottr.org
Thank you!
---
count: false
# Appendix (Random Slides)
---
count: false
# A1. Features - more details
---
class: large
count: false
# A1.1 Well Tested
* Large number of unit tests
* System tests
* High code coverage (union of all platform near 100%)
* Cross platform testing
* CI testing
* Testing several R versions (many generations back)
* Reverse package dependency tests
* All backends highly tested
* Large of tests via doFuture across backends on `example()`:s
from foreach, NMF, TSP, glmnet, plyr, caret, etc.
([example link](https://travis-ci.org/HenrikBengtsson/doFuture))
# R Consortium Infrastructure Steering Committee (ISC) Support Project
* **Backend Conformance Test Suite** - an effort to formalizing and standardizing the above tests into a unified go-to test environment.
---
class: Large
count: false
# A1.2 Nested futures
```r
fastq <- dir(pattern = "[.]fq$")
aligned <- listenv()
for (i in seq_along(fastq)) {
aligned[[i]] %<-% {
chrs <- listenv()
for (j in 1:24) {
chrs[[j]] %<-% DNAseq::align(fastq[i], chr = j)
}
merge_chromosomes(chrs)
}
}
```
--
* `plan(batchtools_sge)`
--
* `plan(list(batchtools_sge, sequential))`
--
* `plan(list(batchtools_sge, multiprocess))`
---
class: Huge
count: false
# A1.3 Lazy evaluation
By default all futures are resolved using eager evaluation, but the _developer_ has the option to use lazy evaluation.
Explicit API:
```r
f <- future(..., lazy = TRUE)
v <- value(f)
```
Implicit API:
```r
v %<-% { ... } %lazy% TRUE
```
---
class: Large
count: false
# A1.4 False-negative & false-positive globals
Identification of globals from static-code inspection has limitations
(but defaults cover a large number of use cases):
* False negatives, e.g. `my_fcn` is not found in `do.call("my_fcn", x)`. Avoid by using `do.call(my_fcn, x)`.
* False positives - non-existing variables,
e.g. NSE and variables in formulas. Ignore and leave it to run-time.
```r
x <- "this FP will be exported"
data <- data.frame(x = rnorm(1000), y = rnorm(1000))
fit %<-% lm(x ~ y, data = data)
```
---
class: large
count: false
# A1.5 Full control of globals (explicit API)
Automatic (default):
```r
x <- rnorm(n = 100)
y <- future({ slow_sum(x) }, globals = TRUE)
```
By names:
```r
y <- future({ slow_sum(x) }, globals = c("slow_sum", "x"))
```
As name-value pairs:
```r
y <- future({ slow_sum(x) }, globals =
list(slow_sum = slow_sum, x = rnorm(n = 100)))
```
Disable:
```r
y <- future({ slow_sum(x) }, globals = FALSE)
```
---
class: large
count: false
# A1.5 Full control of globals (implicit API)
Automatic (default):
```r
x <- rnorm(n = 100)
y %<-% { slow_sum(x) } %globals% TRUE
```
By names:
```r
y %<-% { slow_sum(x) } %globals% c("slow_sum", "x")
```
As name-value pairs:
```r
y %<-% { slow_sum(x) } %globals% list(slow_sum = slow_sum, x = rnorm(n = 100))
```
Disable:
```r
y %<-% { slow_sum(x) } %globals% FALSE
```
---
class: large
count: false
# A1.6 Protection: Exporting too large objects
```r
x <- lapply(1:100, FUN = function(i) rnorm(1024 ^ 2))
y <- list()
for (i in seq_along(x)) {
y[[i]] <- future( mean(x[[i]]) )
}
```
gives error: "The total size of the 2 globals that need to be exported for the future expression ('mean(x[[i]])') is **800.00 MiB. This exceeds the maximum allowed size of 500.00 MiB (option 'future.globals.maxSize')**. There are two globals: 'x' (800.00 MiB of class 'list') and 'i' (48 bytes of class 'numeric')."
--
```r
for (i in seq_along(x)) {
x_i <- x[[i]] ## Fix: subset before creating future
y[[i]] <- future( mean(x_i) )
}
```
---
class: large
count: false
# A1.7 Free futures are resolved
Implicit futures are always resolved:
```r
a %<-% sum(1:10)
b %<-% { 2 * a }
print(b)
## [1] 110
```
--
Explicit futures require care by developer:
```r
fa <- future( sum(1:10) )
a <- value(fa)
fb <- future( 2 * a )
```
--
For the lazy developer - not recommended (may be expensive):
```r
options(future.globals.resolve = TRUE)
fa <- future( sum(1:10) )
fb <- future( 2 * value(fa) )
```
---
class: Large
count: false
# A1.8 What's under the hood?
* **Future class** and corresponding methods:
- abstract S3 class with common parts implemented,
e.g.
globals and protection
- new backends extend this class and implement core methods,
e.g. `value()` and `resolved()`
- built-in classes implement backends on top the parallel package
---
class: large
count: false
# A1.9 Universal union of parallel frameworks
_ _ | future | parallel | foreach | batchtools | BiocParallel
:---------------------|:------------|:----------|:------------|:-----------|:-------------
Synchronous |** ✓ **| ✓ | ✓ | ✓ | ✓
Asynchronous |** ✓ **| ✓ | ✓ | ✓ | ✓
Uniform API |** ✓ **| | ✓ | ✓ | ✓
Extendable API |** ✓ **| | ✓ | ✓ | ✓
Globals |** ✓ **| | (✓)+**(soon by future)** | |
Packages |** ✓ **| | | |
Map-reduce ("lapply") |** ✓ **| ✓ | `foreach()` | ✓ | ✓
Load balancing |** ✓ **| ✓ | ✓ | ✓ | ✓
For loops |** ✓ **| | | |
While loops |** ✓ **| | | |
Nested config |** ✓ **| | | |
Recursive protection |** ✓ **| mc | mc | mc | mc
RNG stream |** ✓+ **| ✓ | doRNG | (planned) | SNOW
Early stopping |** ✓ **| | | | ✓
Traceback |** ✓ **| | | | ✓
---
count: false
# A2 Bells & whistles
---
class: large
count: false
# A2.1 availableCores() & availableWorkers()
* `availableCores()` is a "nicer" version of `parallel::detectCores()` that returns the number of cores allotted to the process by acknowledging known settings, e.g.
- `getOption("mc.cores")`
- HPC environment variables, e.g. `NSLOTS`, `PBS_NUM_PPN`, `SLURM_CPUS_PER_TASK`, ...
- `_R_CHECK_LIMIT_CORES_`
* `availableWorkers()` returns a vector of hostnames based on:
- HPC environment information, e.g. `PE_HOSTFILE`, `PBS_NODEFILE`, ...
- Fallback to `rep("localhost", availableCores())`
Provide safe defaults to for instance
```r
plan(multiprocess)
plan(cluster)
```
---
class: Large
count: false
# A2.2: makeClusterPSOCK()
`future::makeClusterPSOCK()`:
* Improves upon `parallel::makePSOCKcluster()`
* Simplifies cluster setup, especially remote ones
* Avoids common issues when workers connect back to master:
- uses SSH reverse tunneling
- no need for port-forwarding / firewall configuration
- no need for DNS lookup
* Makes option `-l ` optional (such that `~/.ssh/config` is respected)
* Automatically stop clusters when no longer needed, e.g. by garbage collector
* Automatically stop workers if set up of cluster fails; `parallel::makePSOCKcluster()` may leave background R processes behind
---
class: huge
count: false
# A2.3 HPC resource parameters
With 'future.batchtools' one can also specify computational resources, e.g. cores per node and memory needs.
```r
plan(batchtools_sge, resources = list(mem = "128gb"))
y %<-% { large_memory_method(x) }
```
**Specific to scheduler**: `resources` is passed to the job-script template
where the parameters are interpreted and passed to the scheduler.
--
Each future needs one node with 24 cores and 128 GiB of RAM:
```r
resources = list(l = "nodes=1:ppn=24", mem = "128gb")
```
---
class: huge
count: false
# A2.4: Example: An academic cluster
## One worker per compute node (6 workers total)
```r
nodes <- c("cauchy", "leibniz", "bolzano",
"shannon", "euler", "hamming")
plan(cluster, workers = nodes)
## Find our 80 FASTQ files
fastq <- dir(pattern = "[.]fq$") ## 200 GB each
## Align the to human genome
bam <- listenv()
for (i in seq_along(fastq)) {
bam[[i]] %<-% DNAseq::align(fastq[i]) ## 3 hours each
}
```
--
* **Total processing time: ~1.7 days = 40 hours**
---
class: huge
count: false
# A2.5 Example: An academic cluster
## Four workers per compute node (24 workers total)
```r
nodes <- c("cauchy", "leibniz", "bolzano",
"shannon", "euler", "hamming")
plan(cluster, workers = rep(nodes, each = 4))
## Find our 80 FASTQ files
fastq <- dir(pattern = "[.]fq$") ## 200 GB each
## Align the to human genome
bam <- listenv()
for (i in seq_along(fastq)) {
bam[[i]] %<-% DNAseq::align(fastq[i]) ## 3 hours each
}
```
* **Total processing time: ~0.4 days = 10 hours** (cf. 40 hours and 10 days)
---
class: huge
count: false
# A2.6: Nested futures
E.g. one individual per machine **then** one chromosome per core:
* `plan(list(tweak(cluster, workers = nodes), multiprocess))`
--
```r
fastq <- dir(pattern = "[.]fq$")
bam <- listenv()
for (i in seq_along(fastq)) {
## One individual per worker
bam[[i]] %<-% {
chrs <- listenv()
for (j in 1:24) {
## One chromosome per core
chrs[[j]] %<-% DNAseq::align(fastq[i], chr = j)
}
merge_chromosomes(chrs)
}
}
```
---
count: false
# A3. More Examples
---
class: huge
count: false
# A3.1 Plot remotely - display locally
```r
> library(future)
> plan(cluster, workers = "remote.org")
```
--
```r
## Plot remotely
> g %<-% R.devices::capturePlot({
filled.contour(volcano, color.palette = terrain.colors)
title("volcano data: filled contour map")
})
```
--
```r
## Display locally
> g
```
--
???
* R (>= 3.3.0)
* `recordPlot()` + `replayPlot()`
* Replotted using local R plot routines
* X11 and similar is _not_ in play here!
---
class: Large
count: false
# A3.2 Profile code remotely - display locally
```r
> plan(cluster, workers = "remote.org")
> dat <- data.frame(
+ x = rnorm(50e3),
+ y = rnorm(50e3)
+ )
## Profile remotely
> p %<-% profvis::profvis({
+ plot(x ~ y, data = dat)
+ m <- lm(x ~ y, data = dat)
+ abline(m, col = "red")
+ })
```
--
```r
## Browse locally
> p
```
--
---
class: Large
count: false
# A3.3 _fiery_ - flexible lightweight web server (Thomas Lin Pedersen)
> "... **framework for building web servers in R. ... from serving static
> content to full-blown dynamic web-apps**"
---
class: huge
count: false
# A3.4 "It kinda just works" (furrr = future + purrr)
```r
plan(multisession)
mtcars %>%
split(.$cyl) %>%
map(~ future(lm(mpg ~ wt, data = .x))) %>% values %>%
map(summary) %>%
map_dbl("r.squared")
## 4 6 8
## 0.5086326 0.4645102 0.4229655
```
---
count: false
# A4. Future Work
---
class: huge
count: false
# A4.1 Standard resource types(?)
For any type of futures, the develop may wish to control:
* memory requirements, e.g. `future(..., memory = 8e9)`
* local machine only, e.g. `remote = FALSE`
* dependencies, e.g. `dependencies = c("R (>= 3.5.0)", "rio"))`
* file-system availability, e.g. `mounts = "/share/lab/files"`
* data locality, e.g. `vars = c("gene_db", "mtcars")`
* containers, e.g. `container = "docker://rocker/r-base"`
* generic resources, e.g. `tokens = c("a", "b")`
* ...?