---
title: "AWS Athena Unload"
author: "Dyfan Jones"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{AWS Athena Unload}
  %\VignetteEngine{knitr::rmarkdown}
  %\usepackage[UTF-8]{inputenc}
---

> Writes query results from a `SELECT` statement to the specified data format. Supported formats for `UNLOAD` include `Apache Parquet`, `ORC`, `Apache Avro`, and `JSON`. `CSV` is the only output format used by the `Athena` `SELECT` query, but you can use `UNLOAD` to write the output of a `SELECT` query to the formats that `UNLOAD` supports.
>
> Although you can use the `CTAS` statement to output data in formats other than `CSV`, those statements also require the creation of a table in Athena. The `UNLOAD` statement is useful when you want to output the results of a `SELECT` query in a `non-CSV` format but do not require the associated table. For example, a downstream application might require the results of a `SELECT` query to be in `JSON` format, and `Parquet` or `ORC` might provide a performance advantage over `CSV` if you intend to use the results of the `SELECT` query for additional analysis.
>
> (https://docs.aws.amazon.com/athena/latest/ug/unload.html)

`RAthena v-2.2.0.9000+` can now leverage this functionality with the `unload` parameter within `dbGetQuery`, `dbSendQuery`, `dbExecute`. This functionality offers faster performance for mid to large result sizes. 

## Pros and Cons
### **`unload=FALSE`** (Default)

Regular query on `AWS Athena` and then reads the table data as `CSV` directly from `AWS S3`.

**PROS:**

  * Faster for small result sizes (less latency).
  * Supports timestamp with time zone.
  * Supports query caching
  * Can handle some level of nested types.
  
**CONS:**

  * Slower (But stills fairly fast)

### **`unload=TRUE`**

Wraps the query with a `UNLOAD` and then reads the table data as `parquet` directly from `AWS S3`.

**PROS:**

  * Faster for mid and big result sizes.
  * Can handle some level of nested types.
  * Supports query caching
  
**CONS:**

  * Does not support timestamp with time zone
  * Does not support columns with repeated names.
  * Does not support columns with undefined data types.
  * Does not support unnamed columns
  * Does not preserve `order by` due to multiple parquet files being produced by AWS Athena.
  
## Performance comparison:

Set up `AWS Athena` table (example taken from [AWS Data Wrangler: Amazon Athena Tutorial](https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/006%20-%20Amazon%20Athena.html)):

```python
# Python
import awswrangler as wr

import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);

wr.catalog.table(database="awswrangler_test", table="noaa")
```

Benchmark `unload` method using `RAthena`.

```r
# R
library(DBI)

con <- dbConnect(RAthena::athena())

dbGetQuery(con, "select count(*) as n from awswrangler_test.noaa")
# Info: (Data scanned: 0 Bytes)
#           n
# 1: 29554197

# Query ran using CSV output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed
#  57.004   8.430 160.567 

dim(df)
# [1] 29554197        8

RAthena::RAthena_options(cache_size = 1)

# Query ran using UNLOAD Parquet output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  21.622   2.350  39.232 

dim(df)
# [1] 29554197        8

# Query ran using cached UNLOAD Parquet output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#   user  system elapsed 
# 16.515   2.602  12.670 

dim(df)
# [1] 29554197        8
```

Method|Time (seconds)
----|----
`unload=FAlSE`|160.567
`unload=TRUE`|39.232
Cache `unload=TRUE`|12.670

From this simple benchmark test there is a significant improvement in the performance when querying `AWS Athena` while `unload=TRUE`.

**Note:** Benchmark ran on `AWS Sagemaker` `ml.t3.xlarge` instance.

## Set `unload = TRUE` on package level:

Another method to set `unload=TRUE` is to use `RAthena_options()`. By setting `RAthena_options(unload=TRUE)`, `unload` is set to `TRUE` package level and all `DBI` functionality will use it when applicable.

```r
library(DBI)
library(RAthena)

con <- dbConnect(athena())

RAthena_options(unload = TRUE)

dbi_noaa = dbGetQuery(con, "select * from awswrangler_test.noaa")
````

This also give benefits for when using `dplyr` functionality. When setting `RAthena_options(unload=TRUE)` all `dplyr` lazy evaluation will start using `AWS Athena unload`.

```r
tbl_noaa = tbl(con, dbplyr::in_schema("awswrangler_test", "noaa"))

tbl_noaa %>% collect()

#> # A tibble: 29,554,197 x 8
#>    id          dt                  element value m_flag q_flag s_flag obs_time
#>    <chr>       <dttm>              <chr>   <int> <chr>  <chr>  <chr>  <chr>   
#>  1 ASN00074198 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  2 ASN00074222 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  3 ASN00074227 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  4 ASN00075001 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  5 ASN00075005 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  6 ASN00075006 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  7 ASN00075011 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  8 ASN00075013 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  9 ASN00075014 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#> 10 ASN00075018 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#> # ... with 29,554,187 more rows

noaa %>% filter(element == "PRCP") %>% collect()
#> # A tibble: 15,081,580 x 8
#>    id          dt                  element value m_flag q_flag s_flag obs_time
#>    <chr>       <dttm>              <chr>   <int> <chr>  <chr>  <chr>  <chr>   
#>  1 SWE00140492 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  2 SWE00140594 1890-01-06 00:00:00 PRCP        4 NA     NA     E      NA      
#>  3 SWE00140746 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  4 SWE00140828 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  5 SWM00002080 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  6 SWM00002485 1890-01-06 00:00:00 PRCP        1 NA     NA     E      NA      
#>  7 SWM00002584 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  8 TSE00147769 1890-01-06 00:00:00 PRCP       33 NA     NA     E      NA      
#>  9 TSE00147775 1890-01-06 00:00:00 PRCP      150 NA     NA     E      NA      
#> 10 UK000047811 1890-01-06 00:00:00 PRCP       49 NA     NA     E      NA      
#> # ... with 15,081,570 more rows
```