## ----echo = FALSE------------------------------------------------------------- library(DBI) knitr::opts_chunk$set( error = (Sys.getenv("IN_PKGDOWN") != "true"), collapse = TRUE, comment = "#>", eval = RPostgres::postgresHasDefault() ) con <- NULL rp <- NULL rs <- NULL ## ----------------------------------------------------------------------------- # library(DBI) # # con <- dbConnect(RPostgres::Postgres()) # # dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;") # dbExecute(con, " # CREATE TABLE sqroot_vignette_example ( # in_val INTEGER PRIMARY KEY, # out_val DOUBLE PRECISION NULL # ) # ") ## ----echo = FALSE------------------------------------------------------------- # if (!is.null(rs)) { # dbClearResult(rs) ; rs <- NULL # } # if (!is.null(con)) { # dbDisconnect(con) ; con <- NULL # } # if (!is.null(rp)) { # rp$wait() ; rp <- NULL # } ## ----------------------------------------------------------------------------- # con <- dbConnect(RPostgres::Postgres()) # dbExecute(con, "LISTEN grapevine") ## ----------------------------------------------------------------------------- # rp <- callr::r_bg(function() { # library(DBI) # Sys.sleep(0.3) # db_notify <- dbConnect(RPostgres::Postgres()) # dbExecute(db_notify, "NOTIFY grapevine, 'psst'") # dbDisconnect(db_notify) # }) ## ----------------------------------------------------------------------------- # # Sleep until we get the message # n <- NULL # while (is.null(n)) { # n <- RPostgres::postgresWaitForNotify(con) # } # n$payload ## ----------------------------------------------------------------------------- # rs <- dbSendQuery(con, " # SELECT in_val # FROM sqroot_vignette_example # WHERE in_val = $1 # FOR UPDATE # SKIP LOCKED # ", params = list(99)) ## ----echo = FALSE------------------------------------------------------------- # if (!is.null(rs)) { # dbClearResult(rs) ; rs <- NULL # } # if (!is.null(con)) { # dbDisconnect(con) ; con <- NULL # } # if (!is.null(rp)) { # rp$wait() ; rp <- NULL # } ## ----------------------------------------------------------------------------- # worker <- function() { # library(DBI) # db_worker <- dbConnect(RPostgres::Postgres()) # on.exit(dbDisconnect(db_worker)) # dbExecute(db_worker, "LISTEN sqroot") # dbExecute(db_worker, "LISTEN sqroot_shutdown") # # while (TRUE) { # # Wait for new work to do # n <- RPostgres::postgresWaitForNotify(db_worker, 60) # if (is.null(n)) { # # If nothing to do, send notifications of any not up-to-date work # dbExecute(db_worker, " # SELECT pg_notify('sqroot', in_val::TEXT) # FROM sqroot_vignette_example # WHERE out_val IS NULL # ") # next # } # # # If we've been told to shutdown, stop right away # if (n$channel == 'sqroot_shutdown') { # writeLines("Shutting down.") # break # } # # in_val <- strtoi(n$payload) # tryCatch( # { # dbWithTransaction(db_worker, { # # Try and fetch the item we got notified about # rs <- dbSendQuery(db_worker, " # SELECT in_val # FROM sqroot_vignette_example # WHERE out_val IS NULL -- if another worker already finished, don't reprocess # AND in_val = $1 # FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time # ", params = list(in_val)) # in_val <- dbFetch(rs)[1, 1] # dbClearResult(rs) # # if (!is.na(in_val)) { # # Actually do the sqrt # writeLines(paste("Sqroot-ing", in_val, "... ")) # Sys.sleep(in_val * 0.1) # out_val <- sqrt(in_val) # # # Update the datbase with the result # dbExecute(db_worker, " # UPDATE sqroot_vignette_example # SET out_val = $1 # WHERE in_val = $2 # ", params = list(out_val, in_val)) # } else { # writeLines(paste("Not sqroot-ing as another worker got there first")) # } # }) # }, # error = function(e) { # # Something went wrong. Report error and carry on # writeLines(paste("Failed to sqroot:", e$message)) # }) # } # } ## ----------------------------------------------------------------------------- # stdout_1 <- tempfile() # stdout_2 <- tempfile() # rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1) # rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2) # Sys.sleep(1) # Give workers a chance to set themselves up ## ----------------------------------------------------------------------------- # con <- dbConnect(RPostgres::Postgres()) # # add_sqroot <- function(in_val) { # dbExecute(con, " # INSERT INTO sqroot_vignette_example (in_val) VALUES ($1) # ", params = list(in_val)) # dbExecute(con, " # SELECT pg_notify('sqroot', $1) # ", params = list(in_val)) # } # # add_sqroot(7) # add_sqroot(8) # add_sqroot(9) ## ----------------------------------------------------------------------------- # Sys.sleep(3) # rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val") # dbFetch(rs) # dbClearResult(rs) ; rs <- NULL ## ----------------------------------------------------------------------------- # dbExecute(con, "NOTIFY sqroot_shutdown, ''") ## ----------------------------------------------------------------------------- # # We can't control which worker will process the first entry, # # so we sort the results so the vignette output stays the same. # outputs <- sort(c( # paste(readLines(con = stdout_1), collapse = "\n"), # paste(readLines(con = stdout_2), collapse = "\n"))) # # writeLines(outputs[[1]]) # writeLines(outputs[[2]]) ## ----echo = FALSE, error = FALSE---------------------------------------------- # dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;") # dbDisconnect(con) # # rp$wait()