---
title: "Directed Acyclic Graphs (DAGs)"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Directed Acyclic Graphs (DAGs)}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---
```{r echo=FALSE}
knitr::opts_chunk$set(
  collapse = FALSE,
  comment = "",
  out.width = "100%",
  cache = FALSE,
  asciicast_knitr_output = "html"
)

asciicast::init_knitr_engine(
  echo = TRUE,
  echo_input = FALSE,
  same_process = TRUE,
  startup = quote({
    library(maestro)
    set.seed(1) 
  })
)
options(asciicast_theme = "pkgdown")
```


A [directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) or DAG is a kind of network graph where nodes are connected by edges, and these connections cannot loop back or cycle. Most data orchestration tools lay out a data pipeline as a DAG where data is passed from one function to the next until it reaches the end. This allows for more module, single-purpose functions and can make it easier to identify where errors are occurring.

You can create DAG pipelines in maestro using the `maestroInputs` and/or `maestroOutputs` tags. Let's see this in action.

```{r echo=FALSE, warning=FALSE, message=FALSE}
dir.create("pipelines")
writeLines(
  "
  #' @maestroOutputs high_road low_road
  start <- function() {
    c('a', 'A')
  }
  
  #' @maestroInputs start
  high_road <- function(.input) {
    toupper(.input)
  }
  
  #' @maestroInputs start
  low_road <- function(.input) {
    tolower(.input)
  }",
  con = "pipelines/dags.R"
)
```

We'll create three simple pipelines. `start` outputs a vector, `high_road` takes an input and makes it all uppercase, `low_road` makes the input all lowercase. We use the `maestroOutputs` tag to indicate the names of the downstream pipelines (i.e., these pipelines use the output of the target pipeline as input) and we use the `maestroInputs` tag to indicate the names of pipelines that are used as input.[^1]

[^1]: Specifying the outputs and inputs is redundant. You can specify just the outputs or just the inputs if you like, but make sure all pipelines are identified as maestro pipelines by including at least one maestro tag (you could make use of the catch-all `@maestro` tag for this.

Note the use of `.input` as a parameter for all pipelines that receive an input. It is important to have this here to enable the passing of data from inputs to outputs. It must be named `.input`.

```{r eval=FALSE}
#' ./pipelines/dags.R
#' @maestroOutputs high_road low_road
start <- function() {
  c('a', 'A')
}

#' @maestroInputs start
high_road <- function(.input) {
  toupper(.input)
}

#' @maestroInputs start
low_road <- function(.input) {
  tolower(.input)
}
```

Now we'll create and run the schedule. Notice that the output in the console will reflect the network structure of the DAG.

```{asciicast}
# ./orchestrator.R
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  run_all = TRUE
)

get_artifacts(schedule)
```

## ETL Example

A great case for using DAGs is with ETL/ELT pipelines. Each component of extract, transform, and load could be a single element in the DAG. Consider the example on the home page:

```{r eval=FALSE}
#' Example ETL pipeline
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
my_etl <- function() {
  
  # Pretend we're getting data from a source
  message("Get data")
  extracted <- mtcars
  
  # Transform
  message("Transforming")
  transformed <- extracted |> 
    dplyr::mutate(hp_deviation = hp - mean(hp))
  
  # Load - write to a location
  message("Writing")
  write.csv(transformed, file = paste0("transformed_mtcars_", Sys.Date(), ".csv"))
}
```

It's pretty concise, so we probably wouldn't bother breaking it apart in practice, but let's do it for illustrative purposes (and also get rid of the messaging).

```{r echo=FALSE, warning=FALSE, message=FALSE}
file.remove("pipelines/dags.R")
dir.create("pipelines")
writeLines(
  "
  #' @maestroFrequency 1 day
  #' @maestroStartTime 2024-03-25 12:30:00
  #' @maestroOutputs transform
  extract <- function() {
    # Imagine this is something way more complicated, like a database call
    mtcars
  }
  
  #' @maestroOutputs load
  transform <- function(.input) {
    .input |> 
      dplyr::mutate(hp_deviation = hp - mean(hp))
  }
  
  #' @maestro
  load <- function(.input) {
    write.csv(.input, file = paste0('transformed_mtcars.csv'))
  }",
  con = "pipelines/etl.R"
)
```

```{r eval=FALSE}
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
#' @maestroOutputs transform
extract <- function() {
  # Imagine this is something way more complicated, like a database call
  mtcars
}

#' @maestroOutputs load
transform <- function(.input) {
  .input |> 
    dplyr::mutate(hp_deviation = hp - mean(hp))
}

#' @maestro
load <- function(.input) {
  write.csv(.input, file = paste0("transformed_mtcars.csv"))
}
```

```{asciicast}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  run_all = TRUE
)
```

When developing these pipelines, it is helpful to visualize the dependency structure. We can do this by calling `show_network()` on the schedule:

```{r echo=FALSE, warning=FALSE, message=FALSE}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

show_network(schedule)
```

```{r cleanup, echo=FALSE, message=FALSE, warning=FALSE}
unlink("pipelines", recursive = TRUE)
unlink("transformed_mtcars.csv")
```