Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read/write in parallel #91

Merged
merged 11 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ Suggests:
rmarkdown,
crul,
Rarr,
vcr (>= 0.6.0)
vcr (>= 0.6.0),
pbapply,
parallel,
future,
bench
8 changes: 1 addition & 7 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(is_key_error)
export(is_scalar)
export(is_slice)
export(obj_list)
export(pizzarr_option_defaults)
export(pizzarr_sample)
export(slice)
export(zarr_create)
Expand All @@ -39,10 +40,3 @@ export(zarr_open_array)
export(zarr_open_group)
export(zarr_save_array)
export(zb_slice)
importFrom(R6,R6Class)
importFrom(memoise,memoise)
importFrom(memoise,timeout)
importFrom(qs,lz4_compress_raw)
importFrom(qs,lz4_decompress_raw)
importFrom(qs,zstd_compress_raw)
importFrom(qs,zstd_decompress_raw)
10 changes: 4 additions & 6 deletions R/numcodecs.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ Codec <- R6::R6Class("Codec",
#' ZSTD compressor for Zarr
#' @title ZstdCodec Class
#' @docType class
#' @importFrom qs zstd_compress_raw zstd_decompress_raw
#' @description
#' Class representing a ZSTD compressor

Expand All @@ -62,7 +61,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' @return Compressed data.
encode = function(buf, zarr_arr) {
# Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L16
result <- zstd_compress_raw(buf, self$level)
result <- qs::zstd_compress_raw(buf, self$level)
return(result)
},
#' @description
Expand All @@ -71,7 +70,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' @param zarr_arr The ZarrArray instance.
#' @return Un-compressed data.
decode = function(buf, zarr_arr) {
result <- zstd_decompress_raw(buf)
result <- qs::zstd_decompress_raw(buf)
return(result)
},
#' @description
Expand All @@ -89,7 +88,6 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' LZ4 compressor for Zarr
#' @title Lz4Codec Class
#' @docType class
#' @importFrom qs lz4_compress_raw lz4_decompress_raw
#' @description
#' Class representing a LZ4 compressor
#'
Expand All @@ -115,7 +113,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec",
#' @return Compressed data.
encode = function(buf, zarr_arr) {
# Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L24
body <- lz4_compress_raw(buf, self$acceleration)
body <- qs::lz4_compress_raw(buf, self$acceleration)

# The compressed output includes a 4-byte header storing the original size
# of the decompressed data as a little-endian 32-bit integer.
Expand All @@ -135,7 +133,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec",
decode = function(buf, zarr_arr) {
body <- buf[5:length(buf)]

result <- lz4_decompress_raw(body)
result <- qs::lz4_decompress_raw(body)
return(result)
},
#' @description
Expand Down
4 changes: 4 additions & 0 deletions R/onload.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#' @keywords internal
.onLoad <- function(libname = NULL, pkgname = NULL) {
init_options()
}
61 changes: 61 additions & 0 deletions R/options.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Adapted from https://github.com/IRkernel/IRkernel/blob/master/R/options.r

#' pizzarr_option_defaults
#' @export
pizzarr_option_defaults <- list(
pizzarr.http_store_cache_time_seconds = 3600,
pizzarr.parallel_read_enabled = FALSE,
pizzarr.parallel_write_enabled = FALSE
)

#' @keywords internal
parse_parallel_option <- function(val) {
if(val == "future") {
return("future")
}
logical_val <- as.logical(val)
integer_val <- as.integer(val)

if(is.na(integer_val)) {
return(logical_val)
}
if(integer_val <= 1) {
return(as.logical(integer_val))
}
return(integer_val)
}

#' @keywords internal
from_env <- list(
PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer,
PIZZARR_PARALLEL_READ_ENABLED = parse_parallel_option,
PIZZARR_PARALLEL_WRITE_ENABLED = parse_parallel_option
)

# converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL
#' @keywords internal
opt_to_env <- function(nms) {
gsub('.', '_', toupper(nms), fixed = TRUE)
}

# called in .onLoad
#' @keywords internal
init_options <- function() {
for (opt_name in names(pizzarr_option_defaults)) {
# skip option if it is already set, e.g. in the Rprofile
if (is.null(getOption(opt_name))) {
# prepare `options` call from the default
call_arg <- pizzarr_option_defaults[opt_name] # single [] preserve names

# if an env var is set, get value from it.
env_name <- opt_to_env(opt_name)
convert <- from_env[[env_name]]
env_val <- Sys.getenv(env_name, unset = NA)
if (!is.null(convert) && !is.na(env_val)) {
call_arg[[opt_name]] <- convert(env_val)
}

do.call(options, call_arg)
}
}
}
64 changes: 48 additions & 16 deletions R/stores.R
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ MemoryStore <- R6::R6Class("MemoryStore",
#' HttpStore for Zarr
#' @title HttpStore Class
#' @docType class
#' @importFrom memoise memoise timeout
#' @description
#' Store class that uses HTTP requests.
#' Read-only. Depends on the `crul` package.
Expand All @@ -363,16 +362,43 @@ HttpStore <- R6::R6Class("HttpStore",
headers = NULL,
client = NULL,
zmetadata = NULL,
mem_get = NULL,
cache_time_seconds = 3600,
make_request_memoized = NULL,
cache_enabled = NULL,
cache_time_seconds = NULL,
make_request = function(item) {
key <- item_to_key(item)

# mem_get caches in memory on a per-session basis.
res <- private$mem_get(private$client,
paste(private$base_path, key, sep="/"))

return(res)
path <- paste(private$base_path, key, sep="/")

parallel_option <- getOption("pizzarr.parallel_read_enabled")
is_parallel <- is_truthy_parallel_option(parallel_option)

if(is_parallel) {
# For some reason, the crul::HttpClient fails in parallel settings
# This alternative
# with HttpRequest and AsyncVaried seems to work.
# Reference: https://docs.ropensci.org/crul/articles/async.html
req <- crul::HttpRequest$new(
url = private$domain,
opts = private$options,
headers = private$headers
)
req$get(path = path)
res <- crul::AsyncVaried$new(req)
res$request()
return(unclass(res$responses())[[1]])
} else {
return(private$client$get(path = path))
}
},
memoize_make_request = function() {
if(private$cache_enabled) {
private$make_request_memoized <- memoise::memoise(
function(key) private$make_request(key),
~memoise::timeout(private$cache_time_seconds)
)
} else {
private$make_request_memoized <- private$make_request
}
},
get_zmetadata = function() {
res <- private$make_request(".zmetadata")
Expand Down Expand Up @@ -405,16 +431,20 @@ HttpStore <- R6::R6Class("HttpStore",
private$domain <- paste(segments[1:3], collapse="/")
private$base_path <- paste(segments[4:length(segments)], collapse="/")

if(!requireNamespace("crul", quietly = TRUE)) stop("HttpStore requires the crul package")

if(!requireNamespace("crul", quietly = TRUE)) {
stop("HttpStore requires the crul package")
}

private$client <- crul::HttpClient$new(
url = private$domain,
opts = private$options,
headers = private$headers
)

private$mem_get <- memoise(function(client, path) client$get(path),
~timeout(private$cache_time_seconds))

private$cache_time_seconds <- getOption("pizzarr.http_store_cache_time_seconds")
private$cache_enabled <- private$cache_time_seconds > 0

private$memoize_make_request()

private$zmetadata <- private$get_zmetadata()
},
Expand All @@ -423,7 +453,7 @@ HttpStore <- R6::R6Class("HttpStore",
#' @param item The item key.
#' @return The item data in a vector of type raw.
get_item = function(item) {
res <- private$make_request(item)
res <- private$make_request_memoized(item)
return(res$content)
},
#' @description
Expand All @@ -438,7 +468,7 @@ HttpStore <- R6::R6Class("HttpStore",
} else if(!is.null(self$get_consolidated_metadata())) {
return(FALSE)
} else {
res <- private$make_request(item)
res <- private$make_request_memoized(item)

return(res$status_code == 200)
}
Expand Down Expand Up @@ -475,6 +505,8 @@ HttpStore <- R6::R6Class("HttpStore",
#' @param seconds number of seconds until cache is invalid -- 0 for no cache
set_cache_time_seconds = function(seconds) {
private$cache_time_seconds <- seconds
# We need to re-memoize.
private$memoize_make_request()
}
)
)
8 changes: 8 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ item_to_key <- function(item) {
key
}

#' @keywords internal
is_truthy_parallel_option <- function(val) {
if(val == "future") {
return(TRUE)
}
return(as.logical(as.integer(val)))
}

try_from_zmeta <- function(key, store) {
store$get_consolidated_metadata()$metadata[[key]]
}
Expand Down
Loading
Loading