---
title: "Use Cases"
vignette: >
  %\VignetteIndexEntry{Use Cases}
  %\VignetteEngine{quarto::html}
  %\VignetteEncoding{UTF-8}
---

## Hourly Pipeline

This pipeline is an example of a standard *extract transform load* (ETL) workflow. The pipeline is scheduled to run every **3 hours** starting on **2024-04-25** at **05:45:00**. The goal of the pipeline is to perform the following:

-   access online hosted CSV file
-   perform lite data wrangling
-   write file to local storage in parquet format

This example is setup as a simple set of tasks creating objects that are used in the next series of tasks. All components of the pipeline are within the `pipeline_wildfire_hourly` function, which has no parameters.

```{r eval = FALSE, message = FALSE, warning = FALSE}
#' pipeline_wildfire_hourly maestro pipeline
#'
#' @maestroFrequency 3 hour
#' @maestroStartTime 2024-04-25 05:45:00
#' @maestroTz America/Halifax

pipeline_wildfire_hourly <- function() {

  # Access active wildfire data from hosted csv
  df <- readr::read_csv("https://cwfis.cfs.nrcan.gc.ca/downloads/activefires/activefires.csv")
  
  # Data wrangling
  df_geom <- df |>
    dplyr::mutate(insert_datetime = Sys.time()) |>
    sf::st_as_sf(coords = c("lon", "lat"), crs = 4326)
  
  
  # Write active wildfires to file
  basename <- paste("cdn_wildfire", as.integer(Sys.time()), sep = "_")
  
  df_geom |>
    sfarrow::write_sf_dataset("~/data/wildfires",
                              format = "parquet",
                              basename_template = paste0(basename,
                                                         "-{i}.parquet"))
}
```

## Daily Pipeline

This pipeline is an example of a standard *extract transform load* (ETL) workflow. The pipeline is scheduled to run every **day** starting on **2024-04-25** at **06:30:00**. The goal of the pipeline is to perform the following:

-   submit a request to an API
-   extract data from the API
-   add insert datetime column
-   write file to local storage in parquet format

This example has a custom function that is used to access and extract the data from the API, which is piped into additional tasks. All components of the pipeline are within the `pipeline_climate_daily` function, which has no parameters.

```{r eval = FALSE, message = FALSE, warning = FALSE}
#' pipeline_climate_daily maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-04-25 06:30:00
#' @maestroTz America/Halifax

pipeline_climate_daily <- function() {
  
  # Access climate hourly via geomet api 
  hourly_req <- httr2::request("https://api.weather.gc.ca/collections/climate-hourly/items") |> 
    httr2::req_url_query(
      lang = "en-CA",
      offset = 0,
      CLIMATE_IDENTIFIER = 8202251,
      LOCAL_DATE = Sys.Date() - 1
    )
  
  # Perform the request
  hourly_resp <- hourly_req |> 
    httr2::req_perform()
  
  # Climate station response to data frame
  geomet_json <- hourly_resp |> 
    httr2::resp_body_json(simplifyVector = TRUE)
  
  # Write climate hourly to file
  basename <- paste("climate_hourly", as.integer(Sys.time()), sep = "_")

  geomet_json$features |>
    dplyr::mutate(insert_datetime = Sys.time()) |>
    arrow::write_dataset(
      "~/data/climate",
      format = "parquet",
      basename_template = basename
    )
}

```