--- title: "Stopping Jobs" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Stopping Jobs} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Introduction You can save compute resources by stopping the execution of Jobs that are no longer needed. If that Job is currently running on a Worker, then the associated background process is terminated. Terminated background process are automatically replaced by new ones. There are several ways to stop a Job: * `<Job>$stop()`: return an interrupt condition. * `<Job>$output<-`: return a custom result. * `<Job>$stop_id`: replace redundant Jobs. * `<Job>$copy_id`: combine redundant Jobs. * `<Job>$timeout`: cap a Job's compute time. ## Setup ```r library(jobqueue) q <- Queue$new(workers = 1) ``` ## Immediate Stop As soon as a Job is created, it can be stopped by either calling `<Job>$stop()` or by assigning to `<Job>$output`. These methods work up until the Job state is `'done'`. Once the Job state is `'done'`, `<Job>$output` is immutable and calling `<Job>$stop()` will have no effect (since it's already stopped). Calling `<Job>$stop()` and assigning to `<Job>$output` both have the side effect of setting the Job state to `'done'`. ### Interrupting with `<Job>$stop()` ```r job <- Job$new({ Sys.sleep(5); 10 }) job$stop() job$result #> <interrupt: job stopped by user> job <- Job$new({ Sys.sleep(5); 10 }) job$stop('my reason') job$result #> <interrupt: my reason> class(job$result) #> [1] "interrupt" "condition" ``` You can call `<Job>$stop()` from within callback hooks. ```r hooks <- list( 'created' = function (job) { if (length(job$vars$x) > 100) job$stop('Job is too large.') } ) job <- q$run({ sum(x) }, vars = list(x = 1:200), hooks = hooks) job$result #> <interrupt: Job is too large.> ``` ### Assigning to `<Job>$output` `<Job>$output` can only be assigned to once. If the Job is not yet done when `<Job>$output` is assigned, then any ongoing processing will be halted. ```r job <- Job$new({ Sys.sleep(5); 10 }) job$state #> [1] "created" job$output <- "Custom result" job$state #> [1] "done" job$result #> [1] "Custom result" ``` If you've set `<Job>$reformat`, it will determine `<Job>$result` as usual. See `vignette('results')`. ## Replacing and Combining Sometimes repeat jobs may be sent to the Queue, for instance, a user clicking a 'submit' button repeatedly. `stop_id` and `copy_id` can prevent such requests from overloading the server. ### Stop ID New jobs will replace existing jobs with the same `stop_id`. ```r job1 <- q$run({ Sys.sleep(5); 'A' }, stop_id = 123) job2 <- q$run({ 'B' }, stop_id = 123) job1$result #> <interrupt: duplicated stop_id> job2$result #> [1] "B" ``` ### Copy ID New jobs will mirror the output of existing jobs with the same `copy_id`. ```r job1 <- q$run({ Sys.sleep(5); 'A' }, copy_id = 456) job2 <- q$run({ 'B' }, copy_id = 456) job1$result #> [1] "A" job2$result #> [1] "A" ``` ### In `Queue$new()` If you set `stop_id` and/or `copy_id` in `Queue$new()`, then they must be functions (or lambda notation functions) that accept a Job object as input and returns a value `id`. If `id` is `NULL`, then no comparisons will be done. Otherwise, `id` can be any data type and will be compared to other `id`s with `identical()`. ```r q <- Queue$new(stop_id = function (job) list(job$vars$f, job$user_id)) job1 <- q$run({ f(x) }, vars = list(f = sum, x = 1:10), user_id = 'A') job2 <- q$run({ f(x) }, vars = list(f = sum, x = 1:2), user_id = 'A') job3 <- q$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A') job1$result #> <interrupt: duplicated stop_id> job2$result #> [1] 3 job3$result #> [1] 5.5 ``` ## Timeouts ```r job <- Job$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2) q$submit(job) job$result #> <interrupt: total runtime exceeded 0.2 seconds> job <- Job$new({ 10 }, timeout = list(created = 0.2)) job$result #> <interrupt: exceeded 0.2 seconds while in "created" state> ``` Limits (in seconds) can be set on: * The total `'submitted'` to `'done'` time: `timeout = 2` * On a per-state basis: `timeout = list(queued = 1, running = 2)` * Or both: `timeout = list(total = 3, queued = 2, running = 2)`