Chapter 9 High-performance computing

This chapter provides guidance on time-consuming drake workflows and high-level parallel computation.

9.1 Start small

Before you jump into high-performance computing with a large workflow, consider running a downsized version to debug and test things first. That way, you can avoid consuming lots of computing resources until you are reasonably sure everything works. Create a test plan with drake_plan(max_expand = SMALL_NUMBER) before scaling up to the full set of targets, and take temporary shortcuts in your commands so your targets build more quickly for test mode. See this section on plans for details.

9.2 Let make() schedule your targets.

When it comes time to activate high-performance computing, drake launches its own parallel workers and sends targets to those workers. The workers can be local processes or jobs on a cluster. drake uses your project’s implicit dependency graph to figure out which targets can run in parallel and which ones need to wait for dependencies.

You do not need to not micromanage how targets are scheduled, and you do not need to run simultaneous instances of make().

9.3 The master process

make() takes care of the jobs it launches, but make() itself is a job too, and it is your responsibility to manage it.

9.3.1 Master on a cluster

Most clusters will let you submit make() as a job on a compute node. Let’s consider the Sun Grid Engine (SGE) as an example. First, we create a script that calls make() (or r_make()).

Then, we create a shell script (say, run.sh) to call make.R. This script may look different if you use a different scheduler such as SLURM.

Finally, to run the whole workflow, we call qsub.

And here is what happens:

  1. A new job starts on the cluster with the configuration flags next to #$ in run.sh.
  2. run.sh opens R and runs make.R.
  3. make.R invokes drake using the make() function.
  4. make() launches 8 new jobs on the cluster

So 9 simultaneous jobs run on the cluster and we avoid bothering the headnode / login node.

9.3.2 Local master

Alternatively, you can run make() in a persistent background process. The following should work in the Mac/Linux terminal/shell.

nohup nice -19 R CMD BATCH --no-save make.R &

where:

  • nohup: Keep the job running even if you log out of the machine.
  • nice -19: This is a low-priority job that should not consume many resources. Other processes should take priority.
  • R CMD BATCH: Run the R script in a fresh new R session.
  • --no-save: do not save the workspace in a .RData file.
  • &: Run this job in the background so you can do other stuff in the terminal window.

Alternatives to nohup include screen and Byobu.

9.4 Parallel backends

Choose the parallel backend with the parallelism argument and set the jobs argument to scale the work appropriately.

The two primary backends with long term support are clustermq and future. If you can install ZeroMQ, the best choice is usually clustermq. (It is faster than future.) However, future is more accessible: it does not require ZeroMQ, it supports parallel computing on Windows, it can work with more restrictive wall time limits on clusters, and it can deploy targets to Docker images (drake_example("Docker-psock")).

9.5 The clustermq backend

9.5.1 Persistent workers

The make(parallelism = "clustermq", jobs = 2) launches 2 parallel persistent workers. The master process assigns targets to workers, and the workers simultaneously traverse the dependency graph.

9.5.2 Installation

Persistent workers require the clustermq R package, which in turn requires ZeroMQ. Please refer to the clustermq installation guide for specific instructions.

9.5.3 On your local machine

To run your targets in parallel over the cores of your local machine, set the global option below and run make().

9.5.4 On a cluster

Set the clustermq global options to register your computing resources. For SLURM:

Here, slurm_clustermq.tmpl is a template file with configuration details. Use drake_hpc_template_file() to write one of the available examples.

After modifying slurm_clustermq.tmpl by hand to meet your needs, call make() as usual.

9.6 The future backend

9.6.1 Transient workers

make(parallelism = "future", jobs = 2) launches transient workers to build your targets. When a target is ready to build, the master process creates a fresh worker to build it, and the worker terminates when the target is done. jobs = 2 means that at most 2 transient workers are allowed to run at a given time.


9.6.2 Installation

Install the future package.

If you intend to use a cluster, be sure to install the future.batchtools package too. The future ecosystem contains even more packages that extend future’s parallel computing functionality, such as future.callr.

9.6.3 On your local machine

First, select a future plan to tell future how to create the workers. See this table for descriptions of the core options.

Next, run make().

9.6.4 On a cluster

Install the future.batchtools package and use this list to select a future plan that matches your resources. You will also need a compatible template file with configuration details. As with clustermq, drake can generate some examples:

Next, register the template file with a plan.

Finally, run make().

9.7 Advanced options

9.7.2 Memory

By default, make() keeps targets in memory during runtime. Some targets are dependencies of other targets downstream, while others may be no longer actually need to be in memory. The memory_strategy argument to make() allows you to choose the tradeoff that best suits your project. Options:

  • "speed": Once a target is loaded in memory, just keep it there. This choice maximizes speed and hogs memory.
  • "memory": Just before building each new target, unload everything from memory except the target’s direct dependencies. This option conserves memory, but it sacrifices speed because each new target needs to reload any previously unloaded targets from storage.
  • "lookahead": Just before building each new target, search the dependency graph to find targets that will not be needed for the rest of the current make() session. In this mode, targets are only in memory if they need to be loaded, and we avoid superfluous reads from the cache. However, searching the graph takes time, and it could even double the computational overhead for large projects.

9.7.3 Storage

In make(caching = "master"), the workers send the targets to the master process, and the master process stores them one by one in the cache. caching = "master" is compatible with all storr cache formats, including the more esoteric ones like storr_dbi() and storr_environment().

In make(caching = "worker"), the parallel workers are responsible for writing the targets to the cache. Some output-heavy projects can benefit from this form of parallelism. However, it can sometimes add slowness on clusters due to lag from network file systems. And there are additional restrictions:

  • All the workers must have the same file system and the same working directory as the master process.
  • Only the default storr_rds() cache may be used. Other formats like storr_dbi() and storr_environment() cannot accommodate parallel cache operations.

See the storage chapter for details.

9.7.4 The template argument for persistent workers

For more control and flexibility in the clustermq backend, you can parameterize your template file and use the template argument of make(). For example, suppose you want to programatically set the number of “slots” (basically cores) per job on an SGE system (clustermq guide to SGE setup here). Begin with a parameterized template file sge_clustermq.tmpl with a custom n_slots placeholder.

# File: sge_clustermq.tmpl
# Modified from https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp {{ n_slots | 1 }}       # request n_slots cores per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Then when you run make(), use the template argument to set n_slots.

Custom placeholders like n_slots are processed with the infuser package.

9.7.5 The resources column for transient workers

Different targets may need different resources. For example,

The model needs a GPU and multiple CPU cores, and the data only needs the bare minimum resources. Declare these requirements with target(), as below. This is equivalent to adding a new list column to the plan, where each element is a named list for the resources argument of future::future().

Next, plug the names of your resources into the brew patterns of your batchtools template file. The following sge_batchtools.tmpl file shows how to do it, but the file itself probably requires modification before it will work with your own machine.

#!/bin/bash
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
#$ -pe smp <%= resources[["cores"]] %> # CPU cores
#$ -l gpu=<%= resources[["gpus"]] %>   # GPUs.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

Finally, register the template file and run your project.

9.7.6 Parallel computing within targets

To recruit parallel processes within individual targets, we recommend the future.callr and furrr packages. Usage details depend on the parallel backend you choose for make(). If you must write custom code with mclapply(), please read the subsection below on locked bindings/environments.

9.7.6.4 Number of local workers per target

By default, future::availableCores() determines the number of local callr workers. To better manage resources, you may wish to further restrict the number of callr workers for all targets in the plan, e.g. future::plan(future::callr, workers = 4L) or:

Alternatively, you can use chunking to prevent individual targets from using too many workers, e.g. furrr::future_map(.options = furrr::future_options(scheduling = 4)). Here, the scheduling argument sets the average number of futures per worker.

9.7.6.5 Locked binding/environment errors

Some workflows unavoidably use mclapply(), which is known to modify the global environment against drake’s will. If you are stuck, there are two workarounds.

  1. Use make(lock_envir = FALSE).
  2. Use the envir argument of make(). That way, drake locks your special custom environment instead of the global environment.

9.7.7 Custom job schedulers

It is possible to supply a custom job scheduler function to the parallelism argument of make(). The backend_future_lapply_staged() function from the drake.future.lapply.staged package is an example. You might consider writing your own such function if you wish to

  1. Experiment with a more efficient job scheduler before proposing a patch to core drake, or
  2. Aggressively optimize drake for your specialized computing resources.

This feature is very advanced, and you should only attempt it in production if you really know what you are doing. Use at your own risk.

9.7.8 Hasty mode

The drake.hasty package is a bare-bones experimental spin-off of drake. It sacrifices reproducibility to aggressively boost speed when scheduling and executing your targets. It is not recommended for most serious production use cases, but it can useful for experimentation.

Copyright Eli Lilly and Company