Targeting database tables in workflows


My work on the Department of Ecology’s Safety of Oil Transportation Act risk model has been an opportunity for me to explore some of the newer tools available in R for reproducible workflows. Taking the time to learn and implement these tools has been incredibly helpful, both because the model requirements were still being nailed down while I was developing it (and thus I needed to be able to easily re-run things and identify changes to results) and because the sheer volume of data requires we use parallel processing approaches in order to achieve feasible run times. I identified the targets package as an excellent tool to achieve both of these requirements, as it not only provides a framework for running and tracking analysis pipelines (which I use for ETL procedures and scheduling model runs) but also allows us to seamlessly switch to parallel approaches using future and backends such as future.callr or future.batchtools.

There’s pretty good documentation available for targets out there, and a number of ways you can organize a workflow. Our use case with the risk model is a little more complicated than most of the examples in the targets documentation: we have organized the model into a collection of R packages (some of which import another), and several workflows that use one or more of these packages. We also use the renv package to manage workflow dependencies (including the risk model packages). While it is typical for targets pipelines to source one or more R files that define functions used by the pipeline, I’ve taken it a step further and defined high-level “workflow” functions that directly correspond to specific targets in my pipelines. I found that compartmentalizing my analysis in this way was the easiest approach for me to develop and debug pipelines, and to port functionality to other pipelines as we finalize the model for the production environment.

A directory structure for our workflows looks something like this:

- project_dir
    - package 2
    - package 1
    ...
    - workflows
        - workflow 1
            - R
                - functions.r
                - ... 
            - _targets.r
      - workflow 2
            - R
                - functions.r
                - ...
            - _targets.r
      ...
    - renv

Where the “package 1”, “package 2”, etc. folders are actually be git submodules pointing to separate repositories. The renv folder defines the package versions used by the workflows and the depends/imports/suggests used by the various packages, ensuring consistent behavior across the workflows. Technically you don’t need to have the submodules at all (you could just install the packages in the project environment using renv::install() and the repository url) but I found myself developing the packages and workflows side-by-side, so it was convenient to have everything in one place and tell renv to install from the package folders (e.g., renv::install("./package 1")).

Pipelines run by targets create a variety of intermediate files that capture target outputs and identify their completeness and currentness. This works fine for smaller projects, but we store model inputs and outputs in a series of Microsoft SQL Server databases. Furthermore, the organizational structure of the database can be pretty different from how we would organize data if we were able to maintain R objects, so some of our targets need to write multiple tables to the database. This presents a problem, since targets needs to check the return value of each target and produce a hash. My solution was to create a function that generates a checksum of a database table (or a list of checksums for a list of tables):

return_checksum = function(conn, name) {
  # assuming conn is a connection to a MS SQL Server database
  query = paste0("SELECT N'", name, "' AS name, ",
    "CHECKSUM_AGG(BINARY_CHECKSUM(*) AS checksum ",
    "FROM ", name) 
  dbGetQuery(conn, query)
}

After a target writes a table to the database, it retrieves the checksum and returns that instead. In this way, each target still returns an R object that can be validated and tracked for changes. Downstream targets that use those tables as inputs will need to query them from the database again, but to me that’s a fair tradeoff to not have to store local copies of database tables. For added safety, you could even have a target start by recomputing the input table checksum(s) and comparing to the supplied target in order to identify changes made outside of the pipeline (and maybe throw an error).

A single model run is actually a series of modules (mostly) run in sequence, and each module produces multiple tables that may or may not be used as inputs to the next module. Given that we are running tens of thousands of simulations, we needed a way to organize the outputs. We decided on giving each model run its own unique schema in the output database, so that we could (1) easily filter out (or blow away) individual model simulations, but also (2) use consistent names for the various outputs so that it would be easy to summarize results across simulations. The schema name acted as the unique identifier for the model realization. At the same time, we wanted to avoid having to do any manual accounting of which modules had been run or still needed to be run for a given simulation. We found that System Catalog Views provided a good way for us to identify simulation status. For example, to identify whether drift simulations had been completed for the available model runs, I created a view that identified the creation date of the output table produced by the drift module:

CREATE VIEW dbo.vwModelRuns AS (
  SELECT 
      traffic.[TABLE_SCHEMA] AS SimulationID
     ,traffic.[create_date] AS TrafficRunDateTime
     ,drift.[create_date] AS DriftRunDateTime
  FROM ( 
        SELECT
            SCHEMA_NAME(SCHEMA_ID) AS TABLE_SCHEMA 
           ,[create_date]
        FROM sys.tables
        WHERE [name] LIKE 'traffic_table'
  ) traffic
  LEFT JOIN (
        SELECT
            SCHEMA_NAME(SCHEMA_ID) AS TABLE_SCHEMA 
           ,[create_date]
        FROM sys.tables
        WHERE [name] LIKE 'drift_table'
  ) drift
  ON traffic.[TABLE_SCHEMA] = drift.[TABLE_SCHEMA]
)

I could then use this view as the initial input into my targets workflow to identify newly-added simulations that I needed to run the drift simulation for, and then use tarchetypes::tar_group_by() to branch across simulations (schemas) and run the drift module:

list(
  # identify new simulations
  tar_target(all_simulations,
    workflow_get_simulations(new_only = TRUE),
    cue = tar_cue(mode = "always")
  ),
  # group simulations by weather year
  tar_group_by(simulation_groups, all_simulations, SimID),
  # simulate drift and grounding
  tar_target(drift_results,
    workflow_drift(simulation_groups),
    pattern = map(simulation_groups),
    error = "abridge")
)

The function workflow_get_simulations() really just retrieves the contents of the “dbo.vwModelRuns” view; I used an optional argument new_only in workflow_get_simulations() to only return simulations that haven’t been run yet, i.e., rows of dbo.vwModelRuns that have a NULL value for the “DriftRunTime” field. This lets me avoid the overhead of dealing with potentially hundreds of thousands of targets every time I rerun the pipeline, but I can set it to FALSE if I ever need to recheck targets.

The methodology worked out quite well, and I was able to steadily produce model output and modify the load balance whenever necessary to accomodate other analyses running concurrently. I suspect I’ll be using targets a lot in the future, for projects big and small.


Comments