Chapter 10 High-performance computing

This chapter provides guidance on time-consuming drake workflows and high-level parallel computation.

library(drake)
load_mtcars_example()
make(my_plan, jobs = 2)

10.1 Start small

If your workflow is large, consider running a downsized version to debug and test things first. That way, you can avoid consuming lots of computing resources until you are reasonably sure everything works. Create a test plan with drake_plan(max_expand = SMALL_NUMBER) before scaling up to the full set of targets. See this section on plans for details.

10.2 Batch mode for long workflows

If you expect make() to take a long time, create a master script for your project (say, make.R, as in the guidance on projects) and run it in a persistent background process. The following should work in the Mac/Linux terminal/shell.

nohup nice -19 R CMD BATCH --no-save make.R &

where:

  • nohup: Keep the job running even if you log out of the machine.
  • nice -19: This is a low-priority job that should not consume many resources. Other processes should take priority.
  • R CMD BATCH: Run the R script in a fresh new R session.
  • --no-save: do not save the workspace in a .RData file.
  • &: Run this job in the background so you can do other stuff in the terminal window.

10.3 Let make() schedule your targets.

drake uses your project’s implicit dependency graph to figure out which targets can run in parallel and which ones need to wait for dependencies.

load_mtcars_example() # from https://github.com/wlandau/drake-examples/tree/master/mtcars
config <- drake_config(my_plan)
vis_drake_graph(config)

You do not need to not micromanage the timing among targets, and you do not need to run parallel instances of make(). As the next sections describe, drake has built-in parallel and distributed computing support.

10.4 Parallel backends

Choose the parallel backend with the parallelism argument and set the jobs argument to scale the work appropriately.

make(my_plan, parallelism = "future", jobs = 2)

The two primary backends with long term support are clustermq and future. If you can install ZeroMQ, the best choice is usually clustermq. (It is faster than future.) However, future is more accessible: it does not require ZeroMQ, it supports parallel computing on Windows, it can work with more restrictive wall time limits on clusters, and it can deploy targets to Docker images (drake_example("Docker-psock")).

10.5 The clustermq backend

10.5.1 Persistent workers

The make(parallelism = "clustermq", jobs = 2) launches 2 parallel persistent workers. The master process assigns targets to workers, and the workers simultaneously traverse the dependency graph.

10.5.2 Installation

Persistent workers require the clustermq R package, which in turn requires ZeroMQ. Please refer to the clustermq installation guide for specific instructions.

10.5.3 On your local machine

To run your targets in parallel over the cores of your local machine, set the global option below and run make().

options(clustermq.scheduler = "multicore")
make(plan, parallelism = "clustermq", jobs = 2)

10.5.4 On a cluster

Set the clustermq global options to register your computing resources. For SLURM:

options(clustermq.scheduler = "slurm", clustermq.template = "slurm_clustermq.tmpl")

Here, slurm_clustermq.tmpl is a template file with configuration details. Use drake_hpc_template_file() to write one of the available examples.

drake_hpc_template_file("slurm_clustermq.tmpl") # Write the file slurm_clustermq.tmpl.

After modifying slurm_clustermq.tmpl by hand to meet your needs, call make() as usual.

make(plan, parallelism = "clustermq", jobs = 4)

10.6 The future backend

10.6.1 Transient workers

make(parallelism = "future", jobs = 2) launches transient workers to build your targets. When a target is ready to build, the master process creates a fresh worker to build it, and the worker terminates when the target is done. jobs = 2 means that at most 2 transient workers are allowed to run at a given time.


10.6.2 Installation

Install the future package.

install.packages("future") # CRAN release
# Alternatively, install the GitHub development version.
devtools::install_github("HenrikBengtsson/future", ref = "develop")

If you intend to use a cluster, be sure to install the future.batchtools package too. The future ecosystem contains even more packages that extend future’s parallel computing functionality, such as future.callr.

10.6.3 On your local machine

First, select a future plan to tell future how to create the workers. See this table for descriptions of the core options.

future::plan(future::multiprocess) 

Next, run make().

make(plan, parallelism = "future", jobs = 2)

10.6.4 On a cluster

Install the future.batchtools package and use this list to select a future plan that matches your resources. You will also need a compatible template file with configuration details. As with clustermq, drake can generate some examples:

drake_hpc_template_file("slurm_batchtools.tmpl") # Edit by hand.

Next, register the template file with a plan.

library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")

Finally, run make().

make(plan, parallelism = "future", jobs = 2)

10.7 Advanced options

10.7.1 Selectivity

Some targets build so quickly that it is not worth sending them to parallel workers. To run these targets locally in the master process, define a special hpc column of your drake plan. Below, NA and TRUE are treated the same, and make(plan, parallelism = "clustermq") only sends model_1 and model_2 to parallel workers.

drake_plan(
  model = target(
    crazy_long_computation(index),
    transform = map(index = c(1, 2))
  ),
  accuracy = target(
    summarize_accuracy(model),
    transform = combine(model),
    hpc = FALSE
  ),
  specificity = target(
    summarize_specificity(model),
    transform = combine(model),
    hpc = FALSE
  ),
  report = target(
    render(knitr_in("results.Rmd"), output_file = file_out("results.html")),
    hpc = FALSE
  )
)
#> # A tibble: 5 x 3
#>   target     command                                                  hpc  
#>   <chr>      <expr>                                                   <lgl>
#> 1 model_1    crazy_long_computation(1)                              … NA   
#> 2 model_2    crazy_long_computation(2)                              … NA   
#> 3 accuracy   summarize_accuracy(model_1, model_2)                   … FALSE
#> 4 specifici… summarize_specificity(model_1, model_2)                … FALSE
#> 5 report     render(knitr_in("results.Rmd"), output_file = file_out(… FALSE

10.7.2 Memory

By default, make() keeps targets in memory during runtime. Some targets are dependencies of other targets downstream, while others may be no longer actually need to be in memory. The memory_strategy argument to make() allows you to choose the tradeoff that best suits your project. Options:

  • "speed": Once a target is loaded in memory, just keep it there. This choice maximizes speed and hogs memory.
  • "memory": Just before building each new target, unload everything from memory except the target’s direct dependencies. This option conserves memory, but it sacrifices speed because each new target needs to reload any previously unloaded targets from storage.
  • "lookahead": Just before building each new target, search the dependency graph to find targets that will not be needed for the rest of the current make() session. In this mode, targets are only in memory if they need to be loaded, and we avoid superfluous reads from the cache. However, searching the graph takes time, and it could even double the computational overhead for large projects.

10.7.3 Storage

In make(caching = "master"), the workers send the targets to the master process, and the master process stores them one by one in the cache. caching = "master" is compatible with all storr cache formats, including the more esoteric ones like storr_dbi() and storr_environment().

In make(caching = "worker"), the parallel workers are responsible for writing the targets to the cache. Some output-heavy projects can benefit from this form of parallelism. However, it can sometimes add slowness on clusters due to lag from network file systems. And there are additional restrictions:

  • All the workers must have the same file system and the same working directory as the master process.
  • Only the default storr_rds() cache may be used. Other formats like storr_dbi() and storr_environment() cannot accommodate parallel cache operations.

See the storage chapter for details.

10.7.4 The template argument for persistent workers

For more control and flexibility in the clustermq backend, you can parameterize your template file and use the template argument of make(). For example, suppose you want to programatically set the number of “slots” (basically cores) per job on an SGE system (clustermq guide to SGE setup here). Begin with a parameterized template file sge_clustermq.tmpl with a custom n_slots placeholder.

# File: sge_clustermq.tmpl
# Modified from https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp {{ n_slots | 1 }}       # request n_slots cores per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Then when you run make(), use the template argument to set n_slots.

options(clustermq.scheduler = "sge", clustermq.template = "sge_clustermq.tmpl")
library(drake)
load_mtcars_example()
make(
  my_plan,
  parallelism = "clustermq",
  jobs = 16,
  template = list(n_slots = 4) # Request 4 cores per persistent worker.
)

Custom placeholders like n_slots are processed with the infuser package.

10.7.5 The resources column for transient workers

Different targets may need different resources. For example,

plan <- drake_plan(
  data = download_data(),
  model = big_machine_learning_model(data)
)

The model needs a GPU and multiple CPU cores, and the data only needs the bare minimum resources. Declare these requirements in a new list column of the plan. Here, each element is a named list for the resources argument of future::future().

plan$resources <- list(
  list(cores = 1, gpus = 0),
  list(cores = 4, gpus = 1)
)

Next, plug your resources into the brew patterns of your batchtools template file. The following sge_batchtools.tmpl file shows how to do it, but the file itself probably requires modification before it will work with your own machine.

#!/bin/bash
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
#$ -pe smp <%= resources[["cores"]] %> # CPU cores
#$ -l gpu=<%= resources[["gpus"]] %>   # GPUs.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

Finally, register the template file and run your project.

library(drake)
library(future.batchtools)
future::plan(batchtools_sge, template = "sge_batchtools.tmpl")
make(plan, parallelism = "future", jobs = 2)

10.7.6 Parallel computing within targets

To recruit parallel processes within individual targets, we recommend the future.callr and furrr packages. Usage details depend on the parallel backend you choose for make(). If you must write custom code with mclapply(), please read the subsection below on locked bindings/environments.

10.7.6.1 Locally

Use future.callr and furrr normally.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:2, function(x) Sys.getpid())
)

# Tell the drake targets to fork up to 4 callr processes.
future::plan(future.callr::callr)

# Build the targets.
make(plan)

# Process IDs of the local workers of x:
readd(x)

10.7.6.2 Persistent workers

Each persistent worker needs its own future::plan(), which we set with the prework argument of make(). The following example uses SGE. To learn about templates for other clusters, please consult the clustermq documentation.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:2, function(x) Sys.getpid())
)

# Write a template file for clustermq.
writeLines(
  c(
    "#!/bin/bash",
    "#$ -N {{ job_name }}               # job name",
    "#$ -t 1-{{ n_jobs }}               # submit jobs as array",
    "#$ -j y                            # combine stdout/error in one file",
    "#$ -o {{ log_file | /dev/null }}   # output file",
    "#$ -cwd                            # use pwd as work dir",
    "#$ -V                              # use environment variables",
    "#$ -pe smp 4                       # request 4 cores per job",
    "module load R-qualified/3.5.2      # if loading R from an environment module",
    "ulimit -v $(( 1024 * {{ memory | 4096 }} ))",
    "CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker(\"{{ master }}\")'"
  ),
  "sge_clustermq.tmpl"
)

# Register the scheduler and template file with clustermq.
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)

# Build the targets.
make(
  plan,
  parallelism = "clustermq",
  jobs = 2,
  # Each of the two workers can spawn up to 4 local processes.
  prework = quote(future::plan(future.callr::callr))
)

# Process IDs of the local workers of x:
readd(x) 

10.7.6.3 Transient workers

As explained in the future vignette, we can nest our future::plans(). Each target gets its own remote job, and each job can spawn up to 4 local callr processes. The following example uses SGE. To learn about templates for other clusters, please consult the future.batchtools documentation.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:2, function(x) Sys.getpid())
)

# Write a template file for future.batchtools.
writeLines(
  c(
    "#!/bin/bash",
    "#$ -cwd                # use pwd as work dir",
    "#$ -j y                # combine stdout/error in one file",
    "#$ -o <%= log.file %>  # output file",
    "#$ -V                  # use environment variables",
    "#$ -N <%= job.name %>  # job name",
    "#$ -pe smp 4           # 4 cores per job",
    "module load R          # if loading R from an environment module",
    "Rscript -e 'batchtools::doJobCollection(\"<%= uri %>\")'",
    "exit 0"
  ),
  "sge_batchtools.tmpl"
)

# In our nested plans, each target gets its own remote SGE job,
# and each worker can spawn up to 4 `callr` processes.
future::plan(
  list(
    future::tweak(
      future.batchtools::batchtools_sge,
      template = "sge_batchtools.tmpl"
    ),
    future.callr::callr
  )
)

# Build the targets.
make(plan, parallelism = "future", jobs = 2)

# Process IDs of the local workers of x:
readd(x)

10.7.6.4 Number of local workers per target

By default, future::availableCores() determines the number of local callr workers. To better manage resources, you may wish to further restrict the number of callr workers for all targets in the plan, e.g. future::plan(future::callr, workers = 4L) or:

future::plan(
  list(
    future::tweak(
      future.batchtools::batchtools_sge,
      template = "sge_batchtools.tmpl"
    ),
    future::tweak(future.callr::callr, workers = 4L)
  )
)

Alternatively, you can use chunking to prevent individual targets from using too many workers, e.g. furrr::future_map(.options = furrr::future_options(scheduling = 4)). Here, the scheduling argument sets the average number of futures per worker.

10.7.6.5 Locked binding/environment errors

Some workflows unavoidably use mclapply(), which is known to modify the global environment against drake’s will. If you are stuck, there are two workarounds.

  1. Use make(lock_envir = FALSE).
  2. Use the envir argument of make(). That way, drake locks your special custom environment instead of the global environment.
# Load the main example: https://github.com/wlandau/drake-examples
library(drake)
drake_example("main")
setwd("main")

# Define and populate a special custom environment.
envir <- new.env(parent = globalenv())
source("R/packages.R", local = envir)
source("R/functions.R", local = envir)
source("R/plan.R", local = envir)

# Check the contents of your environments.
ls(envir) # Should have your functions and plan
ls()         # The global environment should only have what you started with.

# Build the targets using your custom environment
make(envir$plan, envir = envir)

10.7.7 Custom job schedulers

It is possible to supply a custom job scheduler function to the parallelism argument of make(). The backend_future_lapply_staged() function from the drake.future.lapply.staged package is an example. You might consider writing your own such function if you wish to

  1. Experiment with a more efficient job scheduler before proposing a patch to core drake, or
  2. Aggressively optimize drake for your specialized computing resources.

This feature is very advanced, and you should only attempt it in production if you really know what you are doing. Use at your own risk.

10.7.8 Hasty mode

The drake.hasty package is a bare-bones experimental spin-off of drake. It sacrifices reproducibility to aggressively boost speed when scheduling and executing your targets. It is not recommended for most serious production use cases, but it can useful for experimentation.

Copyright Eli Lilly and Company