My work on the Department of Ecology’s
Safety of Oil Transportation Act risk model
has been an opportunity for me to explore some of the newer tools
available in R for reproducible workflows. Taking the time to learn and
implement these tools has been incredibly helpful, both because the
model requirements were still being nailed down while I was developing
it (and thus I needed to be able to easily re-run things and identify
changes to results) and because the sheer
volume of data requires we use parallel processing approaches in order
to achieve feasible run times. I identified the
targets
package as
an excellent tool to achieve both of these requirements, as it not
only provides a framework for running and tracking analysis pipelines
(which I use for
ETL
procedures and scheduling model runs) but also allows us to seamlessly
switch to parallel approaches using
future
and backends such as
future.callr
or
future.batchtools
.
There’s
pretty good documentation
available for targets
out there, and a number of ways you can
organize a workflow. Our use case with the risk model is
a little more complicated than most of the examples in the targets
documentation: we have organized the model into a collection of R
packages (some of which import another), and several workflows that
use one or more of these packages. We also use the
renv
package to
manage workflow dependencies (including the risk model packages). While
it is typical for targets
pipelines to source one or more R files
that define functions used by the pipeline, I’ve taken it a step
further and defined high-level “workflow” functions that directly
correspond to specific targets in my pipelines. I found that
compartmentalizing my analysis in this way was the easiest approach for
me to develop and debug pipelines, and to port functionality to other
pipelines as we finalize the model for the production environment.
A directory structure for our workflows looks something like this:
- project_dir
- package 2
- package 1
...
- workflows
- workflow 1
- R
- functions.r
- ...
- _targets.r
- workflow 2
- R
- functions.r
- ...
- _targets.r
...
- renv
Where the “package 1”, “package 2”, etc. folders are actually be
git submodules
pointing to separate repositories. The renv
folder defines the
package versions used by the workflows and the depends/imports/suggests
used by the various packages, ensuring consistent behavior across the
workflows. Technically you don’t need to have the submodules at all
(you could just install the packages in the project environment using
renv::install()
and the repository url) but I found myself developing
the packages and workflows side-by-side, so it was convenient to have
everything in one place and tell renv
to install from the package
folders (e.g., renv::install("./package 1")
).
Pipelines run by targets
create a variety of intermediate files that
capture target outputs and identify their completeness and currentness.
This works fine for smaller projects, but we store model inputs and
outputs in a series of Microsoft SQL Server databases. Furthermore,
the organizational structure of the database can be pretty different
from how we would organize data if we were able to maintain R objects,
so some of our targets need to write multiple tables to the database.
This presents a problem, since targets
needs to check the
return value
of each target and produce a hash. My solution was to create a function
that generates a checksum of a database table (or a list of checksums
for a list of tables):
return_checksum = function(conn, name) {
# assuming conn is a connection to a MS SQL Server database
query = paste0("SELECT N'", name, "' AS name, ",
"CHECKSUM_AGG(BINARY_CHECKSUM(*) AS checksum ",
"FROM ", name)
dbGetQuery(conn, query)
}
After a target writes a table to the database, it retrieves the checksum and returns that instead. In this way, each target still returns an R object that can be validated and tracked for changes. Downstream targets that use those tables as inputs will need to query them from the database again, but to me that’s a fair tradeoff to not have to store local copies of database tables. For added safety, you could even have a target start by recomputing the input table checksum(s) and comparing to the supplied target in order to identify changes made outside of the pipeline (and maybe throw an error).
A single model run is actually a series of modules (mostly) run in sequence, and each module produces multiple tables that may or may not be used as inputs to the next module. Given that we are running tens of thousands of simulations, we needed a way to organize the outputs. We decided on giving each model run its own unique schema in the output database, so that we could (1) easily filter out (or blow away) individual model simulations, but also (2) use consistent names for the various outputs so that it would be easy to summarize results across simulations. The schema name acted as the unique identifier for the model realization. At the same time, we wanted to avoid having to do any manual accounting of which modules had been run or still needed to be run for a given simulation. We found that System Catalog Views provided a good way for us to identify simulation status. For example, to identify whether drift simulations had been completed for the available model runs, I created a view that identified the creation date of the output table produced by the drift module:
CREATE VIEW dbo.vwModelRuns AS (
SELECT
traffic.[TABLE_SCHEMA] AS SimulationID
,traffic.[create_date] AS TrafficRunDateTime
,drift.[create_date] AS DriftRunDateTime
FROM (
SELECT
SCHEMA_NAME(SCHEMA_ID) AS TABLE_SCHEMA
,[create_date]
FROM sys.tables
WHERE [name] LIKE 'traffic_table'
) traffic
LEFT JOIN (
SELECT
SCHEMA_NAME(SCHEMA_ID) AS TABLE_SCHEMA
,[create_date]
FROM sys.tables
WHERE [name] LIKE 'drift_table'
) drift
ON traffic.[TABLE_SCHEMA] = drift.[TABLE_SCHEMA]
)
I could then use this view as the initial input into my targets
workflow to identify newly-added simulations that I needed to run the
drift simulation for, and then use tarchetypes::tar_group_by()
to
branch across simulations (schemas) and run the drift module:
list(
# identify new simulations
tar_target(all_simulations,
workflow_get_simulations(new_only = TRUE),
cue = tar_cue(mode = "always")
),
# group simulations by weather year
tar_group_by(simulation_groups, all_simulations, SimID),
# simulate drift and grounding
tar_target(drift_results,
workflow_drift(simulation_groups),
pattern = map(simulation_groups),
error = "abridge")
)
The function workflow_get_simulations()
really just retrieves the
contents of the “dbo.vwModelRuns” view; I used an optional argument
new_only
in workflow_get_simulations()
to only return simulations
that haven’t been run yet, i.e., rows of dbo.vwModelRuns
that have a
NULL value for the “DriftRunTime” field. This lets me
avoid the overhead of dealing with potentially hundreds of thousands
of targets every time I rerun the pipeline, but I can set it to FALSE
if I ever need to recheck targets.
The methodology worked out quite well, and I was able to steadily produce model output and modify the load balance whenever necessary to accomodate other analyses running concurrently. I suspect I’ll be using targets a lot in the future, for projects big and small.
Comments
Want to leave a comment? Visit this post's issue page on GitHub (you'll need a GitHub account).