Chapter 10 High-performance computing

This chapter provides guidance on time-consuming drake workflows and high-level parallel computation.

library(drake)
load_mtcars_example()
make(my_plan, jobs = 2)

10.1 Batch mode for long workflows

If you expect make() to take a long time, create a master script for your project (say, drake-work.R) and run it in a persistent background process. The following should work in the Mac/Linux terminal/shell.

nohup nice -19 R CMD BATCH drake_work.R &

where:

  • nohup: Keep the job running even if you log out of the machine.
  • nice -19: This is a low-priority job that should not consume many resources. Other processes should take priority.
  • R CMD BATCH drake_work.R: Run the drake_work.R script in a new R session.
  • &: Run this job in the background so you can do other stuff in the terminal window.

10.2 Let make() schedule your targets.

drake uses your project’s implicit dependency graph to figure out which targets can run in parallel and which ones need to wait for dependencies.

load_mtcars_example()
config <- drake_config(my_plan)
vis_drake_graph(config)

You do not need to not micromanage the timing among targets, and you do not need to run parallel instances of make(). As the next sections describe, drake has built-in parallel and distributed computing support.

10.3 Parallel backends

Choose the parallel backend with the parallelism argument and set the jobs argument to scale the work appropriately.

make(my_plan, parallelism = "future", jobs = 2)

The two primary backends with long term support are clustermq and future. If you can install ZeroMQ, the best choice is usually clustermq. (It is faster than future.) However, future is more accessible: it does not require ZeroMQ, it supports parallel computing on Windows, it can work with more restrictive wall time limits on clusters, and it can deploy targets to Docker images (drake_example("Docker-psock")).

10.4 The clustermq backend

10.4.1 Persistent workers

The make(parallelism = "clustermq", jobs = 2) launches 2 parallel persistent workers. The master process assigns targets to workers, and the workers simultaneously traverse the dependency graph.

10.4.2 Installation

You must first install ZeroMQ (instructions here) and then install the clustermq package.

install.packages("clustermq") # CRAN release
# Alternatively, install the GitHub development version.
devtools::install_github("mschubert/clustermq", ref = "develop")

10.4.3 On your local machine

To run your targets in parallel over the cores of your local machine, set the global option below and run make().

options(clustermq.scheduler = "multicore")
make(plan, parallelism = "clustermq", jobs = 2)

10.4.4 On a cluster

Set the clustermq global options to register your computing resources. For SLURM:

options(clustermq.scheduler = "slurm", clustermq.template = "slurm_clustermq.tmpl")

Here, slurm_clustermq.tmpl is a template file with configuration details. Use drake_hpc_template_file() to write one of the available examples.

drake_hpc_template_file("slurm_clustermq.tmpl") # Write the file slurm_clustermq.tmpl.

After modifying slurm_clustermq.tmpl by hand to meet your needs, call make() as usual.

make(plan, parallelism = "clustermq", jobs = 4)

10.5 The future backend

10.5.1 Transient workers

make(parallelism = "future", jobs = 2) launches transient workers to build your targets. When a target is ready to build, the master process creates a fresh worker to build it, and the worker terminates when the target is done. jobs = 2 means that at most 2 transient workers are allowed to run at a given time.


10.5.2 Installation

Install the future package.

install.packages("future") # CRAN release
# Alternatively, install the GitHub development version.
devtools::install_github("HenrikBengtsson/future", ref = "develop")

If you intend to use a cluster, be sure to install the future.batchtools package too. The future ecosystem contains even more packages that extend future’s parallel computing functionality, such as future.callr.

10.5.3 On your local machine

First, select a future plan to tell future how to create the workers. See this table for descriptions of the core options.

future::plan(future::multiprocess) 

Next, run make().

make(plan, parallelism = "future", jobs = 2)

10.5.4 On a cluster

Install the future.batchtools package and use this list to select a future plan that matches your resources. You will also need a compatible template file with configuration details. As with clustermq, drake can generate some examples:

drake_hpc_template_file("slurm_batchtools.tmpl") # Edit by hand.

Next, register the template file with a plan.

library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")

Finally, run make().

make(plan, parallelism = "future", jobs = 2)

10.6 Advanced options

10.6.1 Memory

By default, make() keeps targets in memory during runtime. Some targets are dependencies of other targets downstream, while others may be no longer actually need to be in memory. The memory_strategy argument to make() allows you to choose the tradeoff that best suits your project. Options:

  • "speed": Once a target is loaded in memory, just keep it there. This choice maximizes speed and hogs memory.
  • "memory": Just before building each new target, unload everything from memory except the target’s direct dependencies. This option conserves memory, but it sacrifices speed because each new target needs to reload any previously unloaded targets from storage.
  • "lookahead": Just before building each new target, search the dependency graph to find targets that will not be needed for the rest of the current make() session. In this mode, targets are only in memory if they need to be loaded, and we avoid superfluous reads from the cache. However, searching the graph takes time, and it could even double the computational overhead for large projects.

10.6.2 Storage

In make(caching = "master"), the workers send the targets to the master process, and the master process stores them one by one in the cache. caching = "master" is compatible with all storr cache formats, including the more esoteric ones like storr_dbi() and storr_environment().

In make(caching = "worker"), the parallel workers are responsible for writing the targets to the cache. Some output-heavy projects can benefit from this form of parallelism. However, it can sometimes add slowness on clusters due to lag from network file systems. And there are additional restrictions:

  • All the workers must have the same file system and the same working directory as the master process.
  • Only the default storr_rds() cache may be used. Other formats like storr_dbi() and storr_environment() cannot accommodate parallel cache operations.

See the storage chapter for details.

10.6.3 The template argument for persistent workers

For more control and flexibility in the clustermq backend, you can parameterize your template file and use the template argument of make(). For example, suppose you want to programatically set the number of “slots” (basically cores) per job on an SGE system (clustermq guide to SGE setup here). Begin with a parameterized template file sge_clustermq.tmpl with a custom n_slots placeholder.

# File: sge_clustermq.tmpl
# Modified from https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp {{ n_slots | 1 }}       # request n_slots cores per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Then when you run make(), use the template argument to set n_slots.

options(clustermq.scheduler = "sge", clustermq.template = "sge_clustermq.tmpl")
library(drake)
load_mtcars_example()
make(
  my_plan,
  parallelism = "clustermq",
  jobs = 16,
  template = list(n_slots = 4) # Request 4 cores per persistent worker.
)

Custom placeholders like n_slots are processed with the infuser package.

10.6.4 The resources column for transient workers

Different targets may need different resources. For example,

plan <- drake_plan(
  data = download_data(),
  model = big_machine_learning_model(data)
)

The model needs a GPU and multiple CPU cores, and the data only needs the bare minimum resources. Declare these requirements in a new list column of the plan. Here, each element is a named list for the resources argument of future::future().

plan$resources <- list(
  list(cores = 1, gpus = 0),
  list(cores = 4, gpus = 1)
)

Next, plug your resources into the brew patterns of your batchtools template file. The following sge_batchtools.tmpl file shows how to do it, but the file itself probably requires modification before it will work with your own machine.

#!/bin/bash
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
#$ -pe smp <%= resources[["cores"]] %> # CPU cores
#$ -l gpu=<%= resources[["gpus"]] %>   # GPUs.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

Finally, register the template file and run your project.

library(drake)
library(future.batchtools)
future::plan(batchtools_sge, template = "sge_batchtools.tmpl")
make(plan, parallelism = "future", jobs = 2)

10.6.5 Custom job schedulers

It is possible to supply a custom job scheduler function to the parallelism argument of make(). The backend_future_lapply_staged() function from the drake.future.lapply.staged package is an example. You might consider writing your own such function if you wish to

  1. Experiment with a more efficient job scheduler before proposing a patch to core drake, or
  2. Aggressively optimize drake for your specialized computing resources.

This feature is very advanced, and you should only attempt it in production if you really know what you are doing. Use at your own risk.

10.6.6 Parallel computing within targets

You may wish to invoke parallel computing within individual targets, e.g.

plan <- drake_plan(
  a = parallel::mclapply(1:8, sqrt, mc.cores = 4),
  b = parallel::mclapply(1:4, sqrt, mc.cores = 2)
)

or even

plan <- drake_plan(
  a = target(
    parallel::mclapply(
      1:8,
      sqrt,
      # You can change the # of cores without changing the command.
      mc.cores = from_plan("cores")
    ),
    cores = 4 # Changes to this number do not invalidate `a`.
  ),
  b = target(
    parallel::mclapply(1:4, sqrt, mc.cores = from_plan("cores")),
    cores = 2
  )
)

Unfortunately, for reasons described here and here, make(plan) will fail in each case. Workarounds:

  • Avoid mclapply(). furrr::map() and parallel::parLapply() are more dependable alternatives anyway. In the case of furrr, invoke future::plan(future.callr::callr) or future::plan(future::multisession) first.
  • In make(), set the lock_envir argument to FALSE. This approach deactivates important reproducibility guardrails, so use with caution.
  • In mclapply(), set the mc.set.seed argument to FALSE. If your computations require pseudo-random numbers (rnorm(), runif(), etc.) you will need to manually set a different seed for each parallel process, e.g.
parallel::mclapply(X = 1:4, mc.cores = 4, FUN = function(i) {
  set.seed(sum(.Random.seed) + i)
  # Do some work...
})

10.6.7 Hasty mode

The drake.hasty package is a bare-bones spin-off of drake. It sacrifices reproducibility to aggressively boost speed when scheduling and executing your targets. It is not recommended for most serious production use cases, but it can useful for experimentation.

Copyright Eli Lilly and Company