---
title: "Stopping Jobs"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Stopping Jobs}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

## Introduction

You can save compute resources by stopping the execution of Jobs that are no longer needed. If that Job is currently running on a Worker, then the associated background process is terminated. Terminated background process are automatically replaced by new ones.

There are several ways to stop a Job:

* `<Job>$stop()`: return an interrupt condition.
* `<Job>$output<-`: return a custom result.
* `<Job>$stop_id`: replace redundant Jobs.
* `<Job>$copy_id`: combine redundant Jobs.
* `<Job>$timeout`: cap a Job's compute time.


## Setup

```r
library(jobqueue)
q <- Queue$new(workers = 1)
```


## Immediate Stop

As soon as a Job is created, it can be stopped by either calling `<Job>$stop()` or 
by assigning to `<Job>$output`. These methods work up until the Job state is `'done'`. 
Once the Job state is `'done'`, `<Job>$output` is immutable and calling `<Job>$stop()` 
will have no effect (since it's already stopped). Calling `<Job>$stop()` and assigning 
to `<Job>$output` both have the side effect of setting the Job state to `'done'`.


### Interrupting with `<Job>$stop()`

```r
job <- Job$new({ Sys.sleep(5); 10 })
job$stop()
job$result
#> <interrupt: job stopped by user>

job <- Job$new({ Sys.sleep(5); 10 })
job$stop('my reason')
job$result
#> <interrupt: my reason>

class(job$result)
#> [1] "interrupt" "condition"
```

You can call `<Job>$stop()` from within callback hooks.

```r
hooks <- list(
  'created' = function (job) {
    if (length(job$vars$x) > 100)
      job$stop('Job is too large.')
  }
)

job <- q$run({ sum(x) }, vars = list(x = 1:200), hooks = hooks)
job$result
#> <interrupt: Job is too large.>
```


### Assigning to `<Job>$output`

`<Job>$output` can only be assigned to once. If the Job is not yet done when 
`<Job>$output` is assigned, then any ongoing processing will be halted.

```r
job <- Job$new({ Sys.sleep(5); 10 })

job$state
#> [1] "created"

job$output <- "Custom result"

job$state
#> [1] "done"

job$result
#> [1] "Custom result"
```

If you've set `<Job>$reformat`, it will determine `<Job>$result` as usual.
See `vignette('results')`.


## Replacing and Combining

Sometimes repeat jobs may be sent to the Queue, for instance, a user clicking a 
'submit' button repeatedly. `stop_id` and `copy_id` can prevent such requests from 
overloading the server.


### Stop ID

New jobs will replace existing jobs with the same `stop_id`.

```r
job1 <- q$run({ Sys.sleep(5); 'A' }, stop_id = 123)
job2 <- q$run({ 'B' },               stop_id = 123)
job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] "B"
```

### Copy ID

New jobs will mirror the output of existing jobs with the same `copy_id`.

```r
job1 <- q$run({ Sys.sleep(5); 'A' }, copy_id = 456)
job2 <- q$run({ 'B' },               copy_id = 456)
job1$result
#> [1] "A"
job2$result
#> [1] "A"
```

### In `Queue$new()`

If you set `stop_id` and/or `copy_id` in `Queue$new()`, then they must be functions 
(or lambda notation functions) that accept a Job object as input and returns a value 
`id`. If `id` is `NULL`, then no comparisons will be done. Otherwise, `id` can be any
data type and will be compared to other `id`s with `identical()`.

```r
q <- Queue$new(stop_id = function (job) list(job$vars$f, job$user_id))

job1 <- q$run({ f(x) }, vars = list(f = sum,  x = 1:10), user_id = 'A')
job2 <- q$run({ f(x) }, vars = list(f = sum,  x = 1:2),  user_id = 'A')
job3 <- q$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A')

job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] 3
job3$result
#> [1] 5.5
```



## Timeouts

```r
job <- Job$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2)
q$submit(job)
job$result
#> <interrupt: total runtime exceeded 0.2 seconds>

job <- Job$new({ 10 }, timeout = list(created = 0.2))
job$result
#> <interrupt: exceeded 0.2 seconds while in "created" state>
```

Limits (in seconds) can be set on:

* The total `'submitted'` to `'done'` time: `timeout = 2`
* On a per-state basis: `timeout = list(queued = 1, running = 2)`
* Or both: `timeout = list(total = 3, queued = 2, running = 2)`