Chapter 10 High-performance computing with drake

drake is not only a reproducibility tool, but also a high-performance computing engine. To activate parallel computing, just set the jobs argument of make() to a value greater than 1. Below, up to 2 targets can run simultaneously at any given time.

library(drake)
load_mtcars_example()
make(my_plan, jobs = 2)

10.1 Batch mode for long workflows

To deploy serious long workflows, we recommend putting the call to make() in a script (say, drake_work.R) and running it in an unobtrusive background process that persists after you log out. In the Linux command line, this is straightforward.

nohup nice -19 R CMD BATCH drake_work.R &

Or, you could call drake inside an overarching Makefile that chains multiple stages together in a larger reproducible pipeline. (See Karl Broman’s post on Makefiles for reproducible research.)

all: final_output.pdf

final_output.pdf: python_magic.py results_summary.csv
    python python_magic.py

results_summary.csv: drake_work.R
    Rscript drake_work.R

clean:
    rm -rf .drake

Then, run your whole pipleine in a persistent background process.

nohup nice -19 R CMD BATCH make &

If you do write a custom Makefile at the root of your project and you plan to use make(parallelism = "Makefile"), please read about make(parallelism = "Makefile") later in this document to avoid potential conflicts between your Makefile and the one drake writes.

10.2 Let drake schedule your targets.

When you deploy your project, drake uses the dependency network to figure out how to run your work in parallel. You as the user do not have to micromanage when individual targets are built.

load_mtcars_example()
config <- drake_config(my_plan)
vis_drake_graph(config)

10.3 Parallel backends

There are multiple ways to walk this graph and multiple ways to launch workers, and every project has its own needs. Thus, drake supports multiple parallel backends. Choose the backend with the parallelism argument.

make(my_plan, parallelism = "parLapply", jobs = 2)

You can use a different backend for the imports than you select for the targets. If you do so, you force all the imports to be processed before any of the targets are built, but you might want to do so anyway. For example, staged scheduling could be great for imports even when it is not be the right choice for the targets (more on that later).

make(
  my_plan,
  parallelism = c(imports = "mclapply_staged", targets = "mclapply"),
  jobs = 2
)

List your options with parallelism_choices().

parallelism_choices()
##  [1] "clustermq"            "clustermq_staged"     "future"              
##  [4] "future_lapply"        "future_lapply_staged" "hasty"               
##  [7] "Makefile"             "mclapply"             "mclapply_staged"     
## [10] "parLapply"            "parLapply_staged"

The backends vary widely in terms of how the workers deploy and how they are scheduled.

Deploy: local Deploy: remote
Schedule: persistent “mclapply”, “parLapply” “hasty”, “clustermq”, “future_lapply”
Schedule: transient “future”, “Makefile”
Schedule: staged “mclapply_staged”, “parLapply_staged” “clustermq_staged”, “future_lapply_staged”

The next sections describe how and when to use each scheduling algorithm and deployment strategy.

10.4 Local workers

Local workers deploy as separate forks or processes to you computer. The "mclapply" and "mclapply_staged" backends uses the mclapply() function from the parallel package to launch workers.

make(my_plan, parallelism = "mclapply", jobs = 2)
make(my_plan, parallelism = "mclapply_staged", jobs = 2)

Workers are quicker to launch than in any other drake backend, so these two choices are the lowest-overhead options. However, they have limitations: the mclapply() function is inefficient with respect to computer memory (see explanations here and here) and it cannot launch multiple workers on Windows. For this reason, drake supports platform agnostic backends "parLapply" and "parLapply_staged", both of which are based on the parLapply() function from the parallel package. These options work on Windows, but each make() requires extra overhead to create a parallel socket (PSOCK) cluster.

make(my_plan, parallelism = "parLapply", jobs = 2)
make(my_plan, parallelism = "parLapply_staged", jobs = 2)

The default parallelism is "parLapply" on Windows and "mclapply" everywhere else.

default_parallelism()
## [1] "mclapply"

10.5 Remote workers

The "future_lapply", "future", and "Makefile" backends have the option to launch workers to remote resources such as nodes on a computing cluster.

parallelism_choices(distributed_only = TRUE)
## [1] "clustermq"            "clustermq_staged"     "future"              
## [4] "future_lapply"        "future_lapply_staged" "hasty"               
## [7] "Makefile"

Testing them out is straightforward.

make(my_plan, parallelism = "clustermq", jobs = 2)
make(my_plan, parallelism = "clustermq_staged", jobs = 2)
make(my_plan, parallelism = "future", jobs = 2)
make(my_plan, parallelism = "future_lapply", jobs = 2)
make(my_plan, parallelism = "future_lapply_staged", jobs = 2)
make(my_plan, parallelism = "Makefile", jobs = 2)

For remote workers, the all the imports are processed with one of the local worker backends before any of the targets start. You can use different numbers of workers for the imports and the targets.

make(my_plan, parallelism = "clustermq", jobs = c(imports = 2, targets = 4))

By default, these backends launch the workers on your local machine. It takes extra configuring to actually deploy them to a remote cluster. The next subsections have the details.

10.5.1 "clustermq", "clustermq_staged", and "hasty"

The clustermq R package is a fast, user-friendly tool for sending R jobs to clusters, and clustermq-powered drake workers suffer less overhead than other remote workers. To use drake’s clustermq backends, you must first install ZeroMQ (installation instructions here). Then, install clustermq explicitly (it does not come with drake).

install.packages("clustermq") # CRAN release
# Alternatively, install the GitHub development version.
devtools::install_github("mschubert/clustermq", ref = "develop") # "develop" is the name of a git branch.

clustermq detects most clusters automatically. However, you may still want to configure resources, wall time limits, etc. If so, you will need a template file. Use drake_hpc_template_file() to write one of the available example template files for clustermq. (list with drake_hpc_template_files()). You will probably need to edit your tempalte file manually to match your resources and needs.

drake_hpc_tempalte_file("slurm_clustermq.tmpl") # Write the file slurm_clustermq.tmpl.

Next, configure clustermq to use your scheduler and template file. clustermq has a really nice wiki with more detailed directions.

options(clustermq.scheduler = "slurm", clustermq.template = "slurm_clustermq.tmpl")

Tip: for a dry run on your local machine, use options(clustermq.scheduler = "multicore") instead. Either way, once it is time to run your project, simply call make() with one of the clustermq parallelism backends.

make(my_plan, parallelism = "clustermq", jobs = 4)
make(my_plan, parallelism = "clustermq_staged", jobs = 4) # Different job scheduling algorithm.

10.5.1.1 Control how I/O is scheduled

With the caching argument to make(), you have the option to either let the workers cache the targets. You can choose either caching = "worker" or caching = "master". In the former case, the workers cache the targets directly to the file system in parallel. Target return values are transferred to the central cache over a network file system, not ZeroMQ sockets. In the latter case, target values are sent to the master process via fast ZeroMQ sockets, but then the master has to cache all the data by itself, which could create a bottleneck. caching = "master" also allows for alternative cache types such as storr::storr_environment() and storr::storr_dbi() caches. See the storr package documentation for details.

10.5.1.2 The template argument of make()

For more control and flexibility in the clustermq-based backends, you can parameterize your template file and use the template argument of make() configure resource requirements. For example, suppose you want to programatically set the number of “slots” (basically cores) per job on an SGE system (clustermq guide to SGE setup here). We begin with a parameterized template file sge_clustermq.tmpl with a custom n_slots placeholder.

# File: sge_clustermq.tmpl
# Modified from https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp {{ n_slots | 1 }}       # request n_slots cores per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Then when you run make(), use the template argument to set n_slots.

options(clustermq.scheduler = "sge", clustermq.template = "sge_clustermq.tmpl")
library(drake)
load_mtcars_example() # Or set up your own workflow.
make(
  my_plan,
  parallelism = "clustermq",
  jobs = 16 # Number of jobs / nodes.
  template = list(n_slots = 4) # Request 4 cores per job.
)

Custom placeholders like n_slots are processed with the infuser package.

10.5.1.3 Hasty mode

make(plan, parallelism = "hasty", jobs = 8) # Like "clustermq" parallelism
make(plan, parallelizm = "hasty", jobs = 1) # Ordinary local serial computation

In hasty mode, drake abandons its claims to reproducibility and focuses exclusively on job scheduling. It forgets about tracking output, and it no longer tries to skip up-to-date targets. In fact, it runs every target every time you call make(). But in exchange, job scheduling is extremely low-overhead, especially for projects with large datasets and lots of targets.

## Warning: load_main_example() is deprecated. Use drake_example("main")
## instead.

We emphasize: in hasty mode, the core reproducibility of drake are no longer valid. Here, drake cannot provide evidence that your results are synchronized with the underlying code and data. Hasty mode should only be used for debugging and testing or if computional reproducibility is handled some other way.

And ask yourself: are you really willing to run all your targets from the beginning next time you call make()? Are you willing to iterate on a project where up-to-date targets are no longer skipped? Are the short-term speed gains really worth it? In the words of an esteemed colleague,

The fastest code is the code you do not run.

10.5.1.4 Specific usage concerns

The return values of targets remain available in memory while make() is executing. Unlike drake’s usual behavior, those values stay in memory even when they are no longer needed. In addition, the return values are no longer stored in a cache. Consequently,

  1. You should save all your important (and large) results manually to your own file_out() files.
  2. loadd() and readd() no longer retrieve data.
  3. Subsequent non-hasty calls to make()s start from scratch.

10.5.1.5 Benefits of hasty mode

  1. drake’s hasty mode cuts corners to gain speed, but it still knows which targets it can run in parallel and which need to wait for dependencies to finish. In other words, hasty mode is still smart enough to traverse end-to-end pipelines of jobs that need to wait for each other.
  2. In drake version >= 6.1.0.9000, you can micromanage exactly how each target is built. The hasty_build argument to make() is a function that controls exactly how each target’s command is run. The default hasty_build function is
default_hasty_build
## function (target, config) 
## {
##     tidy_expr <- eval(expr = config$layout[[target]]$command_build, 
##         envir = config$envir)
##     eval(expr = tidy_expr, envir = config$envir)
## }
## <bytecode: 0x4d27da8>
## <environment: namespace:drake>

You can supply an alternative build function with your own customization. In the following thought experiment, we assume you already defined a function log_to_website() that communicated with a web service as your targets are progressing.

my_hasty_build <- function(target, config){
  log_to_website("start") # Notify a website that the target is starting.
  large_dataset <- default_hasty_build(target, config) # Run the target's command.
  log_to_website("end") # Notify a website that the target is finished.
  write.csv(large_dataset, paste0(target, ".csv") # Write the output to a file.
  digest::digest(large_dataset) # Just return a small fingerprint.
}

make(my_plan, parallelism = "hasty", jobs = 2, hasty_build = my_hasty_build)

User-side applications aside, the main purpose of the hasty_build argument is actually to experiment with new internals for drake itself. Benchmarking in hasty mode is much easier this way.

10.5.2 "future", "future_lapply", and "future_lapply_staged"

The plan() function from the future package configures how and where the workers will deploy on the next make(). For example, the following code uses future’s multisession backend, which is analogous to drake’s "parLapply" parallelism.

library(future)
future::plan(multisession)
make(my_plan, parallelism = "future", jobs = 2)
# Same technology, different scheduling:
make(my_plan, parallelism = "future_lapply", jobs = 2)

In the case of parallelism = "future", the caching argument can be either "worker" or "master", and it works the same as for "clustermq" parallelism.

For "future_lapply_staged" parallelism, the jobs argument is ignored, and you must instead specify the number of workers in future::plan().

future::plan(multisession, workers = 2)
make(my_plan, parallelism = "future_lapply_staged")

To deploy to a cluster (say, a SLURM cluster), you need the batchtools and future.batchtools packages.

library(future.batchtools)

You also need template file to configure batchtools with the cluster: memory specifications, wall time limits etc. Use drake_hpc_template_file() to write one of the available example template files for batchtools (list with drake_hpc_template_files()). You will probably need to edit your tempalte file manually to match your resources and needs.

drake_hpc_tempalte_file("slurm_batchtools.tmpl") # Write the file slurm_batchtools.tmpl.

Load the template file your future::plan() and call make() to run the project.

future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")
make(my_plan, parallelism = "future", jobs = 2)
# Same technology, different scheduling options:
make(my_plan, parallelism = "future_lapply", jobs = 2)

And as before, "future_lapply_staged" uses the workers argument from future::plan() rather than jobs in make().

future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl", workers = 2)
make(my_plan, parallelism = "future_lapply_staged")

See packages future, future.batchtools, and batchtools for more options. For example, the alternatives for future::plan() are listed here and here.

10.5.3 "Makefile"

Here, drake actually writes, configures, and runs a proper Makefile to run the targets.

make(my_plan, parallelism = "Makefile", jobs = 2)

You can configure both the Unix command that runs the Makefile and the command line arguments passed to it.

make(
  my_plan,
  parallelism = "Makefile",
  command = "lsmake",
  args = c("--touch", "--silent")
)

If drake’s Makefile conflicts with a Makefile you already wrote yourself, drake does not overwrite your Makefile. Instead, make() tells you about the conflict and then stops running. To force drake to use a different Makefile that does not conflict with yours, pass the file path to the makefile_path argument and set the --file argument in args.

make(
  my_plan,
  parallelism = "Makefile",
  makefile_path = "my_folder/my_makefile",
  args = "--file=my_folder/my_makefile"
)

There are more customization options in make(), such as the recipe_command argument.

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -e 'R_RECIPE' -q")

See the help files of individual functions for details.

default_Makefile_command()
## [1] "make"

default_recipe_command()
## [1] "Rscript -e 'R_RECIPE'"

r_recipe_wildcard()
## [1] "R_RECIPE"

Makefile_recipe(
  recipe_command = "R -e 'R_RECIPE' -q",
  target = "this_target",
  cache_path = "custom_cache"
)
## R -e 'drake::mk(target = "this_target", cache_path = "custom_cache")' -q

To deploy workers to a cluster, you need to supply the Makefile with a custom shell script that launches cluster jobs. Use the shell_file() function to write an example compatible with the Univa Grid Engine. You will probably need to configure it manually. Suppose our file is shell.sh.

#!/bin/bash
shift
echo "module load R; $*" | qsub -sync y -cwd -j y

You will need to set permissions to allow execution. In the Linux command line, this is straightforward.

$ chmod +x shell.sh 

When you actually call make(), use the prepend argument to write a line at the top of the Makefile to reference your shell file.

make(my_plan, parallelism = "Makefile", jobs = 2, prepend = "SHELL=./shell.sh")

SLURM users may be able to invoke srun and dispense with shell.sh altogether, though success may vary depending on the SLURM system. You will probably also need to set resource allocation parameters governing memory, runtime, etc. See man srun for the possible .SHELLFLAGS.

make(
  my_plan,
  parallelism = "Makefile",
  jobs = 2,
  prepend = c(
    "SHELL=srun",
    ".SHELLFLAGS=-N1 -n1 bash -c"
  )
)

10.6 Scheduling algorithms

10.6.1 Persistent scheduling

Backends "mclapply", "parLapply", "clustermq", and "future_lapply" launch persistent workers.

make(my_plan, parallelism = "mclapply", jobs = 2)
make(my_plan, parallelism = "parLapply", jobs = 2)
options(clustermq.scheduler = "slurm")
make(my_plan, parallelism = "clustermq", jobs = 2)
future::plan(future::multisession)
make(my_plan, parallelism = "future_lapply", jobs = 2)

In each of these calls to make(), three processes launch: two workers and one master. Whenever a worker is idle, the master assigns it the next available target (whose dependencies have been built). The workers keep running until there are no more targets to build. The following video demonstrates the concept.

For staged scheduling, you can micromanage which workers can run which targets. This column can be an integer vector or a list of integer vectors. Simply set an optional workers column in your drake_plan(). Why would you wish to do this? Consider the mtcars example.

load_mtcars_example()
my_plan$workers <- 1
my_plan$workers[grepl("large", my_plan$target)] <- 2
my_plan
## # A tibble: 15 x 3
##    target           command                                        workers
##    <chr>            <chr>                                            <dbl>
##  1 report           "knit(knitr_in(\"report.Rmd\"), file_out(\"re…       1
##  2 small            simulate(48)                                         1
##  3 large            simulate(64)                                         2
##  4 regression1_sma… reg1(small)                                          1
##  5 regression1_lar… reg1(large)                                          2
##  6 regression2_sma… reg2(small)                                          1
##  7 regression2_lar… reg2(large)                                          2
##  8 summ_regression… suppressWarnings(summary(regression1_small$re…       1
##  9 summ_regression… suppressWarnings(summary(regression1_large$re…       2
## 10 summ_regression… suppressWarnings(summary(regression2_small$re…       1
## 11 summ_regression… suppressWarnings(summary(regression2_large$re…       2
## 12 coef_regression… suppressWarnings(summary(regression1_small))$…       1
## 13 coef_regression… suppressWarnings(summary(regression1_large))$…       2
## 14 coef_regression… suppressWarnings(summary(regression2_small))$…       1
## 15 coef_regression… suppressWarnings(summary(regression2_large))$…       2

Here, one of the workers is in charge of all the targets that have to do with the large dataset. That way, we do not need other workers to read large from disk. If reads from disk take a long time, this could speed up your workflow. On the other hand, delegating all the large targets to worker 2 could prevent worker 1 from sharing the computational load, which could slow things down. Ultimately, you as the user need to make these tradeoffs. Also, the workers column only applies to the persistent scheduling backends.

Similarly, you can set an optional priority column for your drake_plan().

plan <- drake_plan(A = build(), B = stuff())
plan$priority <- c(1, 2)
plan
## # A tibble: 2 x 3
##   target command priority
##   <chr>  <chr>      <dbl>
## 1 A      build()        1
## 2 B      stuff()        2

Because of the priority column, if targets A and B are both ready to build, then A will be assigned to a worker first. Custom priorities apply to the staged scheduling backends, plus the "future" backend.

The predict_runtime() and predict_load_balancing() functions emulate persistent workers, and the predictions also apply to transient workers. See the timing guide for a demonstration. These functions also respond to the workers column.

10.6.2 Transient scheduling

Persistent workers are great because they minimize overhead: all the workers are created at the beginning, and then you never have to create any more for the rest of the runthrough. Unfortunately, computing clusters usually limit the amount of time each worker can stay running. That is why drake also supports transient workers in backends "future" and "Makefile". Here, the master process creates a new worker for each target individually, and the worker dies after it finishes its single target. For the "future" backend, the master is just the existing process calling make(). The following video demonstrates the concept.


future::plan(future::multisession)
make(my_plan, parallelism = "future", jobs = 2)
make(my_plan, parallelism = "Makefile", jobs = 2)

10.6.3 Staged scheduling

Backends "mclapply_staged", "parLapply_staged", and "clustermq_staged" support staged scheduling.

make(my_plan, parallelism = "mclapply_staged", jobs = 2)
make(my_plan, parallelism = "parLapply_staged", jobs = 2)

Here, the dependency network is divided into separate stages of conditionally independent targets. Within each stage, drake uses mclapply() or parLapply() to process the targets in parallel. Stages run one after the other, so the slowest target in the current stage needs to complete before the next stage begins. So we lose a lot of parallel efficiency. The following video demonstrates the major drawback.[1]

However, because there is no formal master process in each stage, overhead is extremely low. This lack of overhead can make staged parallelism a great choice for projects with a small number of large stages: tall dependency graphs with most of the work in the tallest stages.

library(dplyr)
library(drake)

N <- 500

gen_data <- function() {
  tibble(a = seq_len(N), b = 1, c = 2, d = 3)
}

plan_data <- drake_plan(
  data = gen_data()
)

plan_sub <-
  gen_data() %>%
  transmute(
    target = paste0("data", a),
    command = paste0("data[", a, ", ]")
  )

plan <- bind_rows(plan_data, plan_sub)
plan
## # A tibble: 501 x 2
##    target command   
##    <chr>  <chr>     
##  1 data   gen_data()
##  2 data1  data[1, ] 
##  3 data2  data[2, ] 
##  4 data3  data[3, ] 
##  5 data4  data[4, ] 
##  6 data5  data[5, ] 
##  7 data6  data[6, ] 
##  8 data7  data[7, ] 
##  9 data8  data[8, ] 
## 10 data9  data[9, ] 
## # ... with 491 more rows

config <- drake_config(plan)
vis_drake_graph(config)

10.7 Final thoughts

10.7.1 Debugging

For large workflows, downsizing and debugging tools become super important. See the guide to debugging and testing drake projects for help on diagnosing problems with a workflow.

10.7.2 drake as an ordinary job scheduler

If you do not care about reproducibility and you want drake to be an ordinary job scheduler, consider using alternative triggers (see ?trigger).

load_mtcars_example()
make(my_plan, trigger = trigger(condition = TRUE))

Above, drake only builds the missing targets. This skips much of the time-consuming hashing that ordinarily detects which targets are out of date.

10.7.3 More resources

See the timing guide for explanations of functions predict_runtime() and predict_load_balancing(), which can help you plan and strategize deployment.

10.8 Footnotes

[1] The video of staged parallelism is an oversimplification. It holds mostly true for make(parallelism = "parLapply_staged"), but make(parallelism = "mclapply_staged") is a bit different. In the former case, each stage is a call to parLapply(), which recycles existing workers on a pre-built parallel socket (PSOCK) cluster. But in the latter, every stage is a new call to mclapply(), which launches a brand new batch of workers. In that sense, workers in make(parallelism = "parLapply_staged") are sort of persistent, and workers in make(parallelism = "mclapply_staged") are sort of transient for some projects.

Copyright Eli Lilly and Company