From 9301f39fad4a11ef3836ed04de41bd9bac52bd7b Mon Sep 17 00:00:00 2001 From: langbart Date: Thu, 29 Aug 2024 22:35:56 +0200 Subject: [PATCH] move to parquet format + document and fucntions keywording + organize R folder by modules --- .github/workflows/pkgdown.yaml | 8 +- .gitignore | 1 + DESCRIPTION | 6 +- Dockerfile | 6 +- Dockerfile.prod | 6 +- NAMESPACE | 25 +- NEWS.md | 10 + R/cloud-storage.R | 257 --------------- R/get-cloud-files.R | 58 ---- R/ingest-wcs-surveys.R | 61 ---- R/ingestion.R | 206 ++++++++++++ R/kepler-mapping.R | 1 + R/preprocess-wcs-surveys.R | 122 ------- R/preprocessing.R | 349 +++++++++++++++++++++ R/processing.R | 69 ---- R/pt_nest_survey.R | 204 ------------ R/retrieve-wcs-data.R | 128 -------- R/storage.R | 269 ++++++++++++++++ R/utils.R | 3 +- R/validate-surveys.R | 115 ------- R/{validation-functions.R => validation.R} | 152 +++++++-- _pkgdown.yml | 41 +++ man/add_version.Rd | 1 + man/alert_outlier.Rd | 3 +- man/cloud_object_name.Rd | 75 ++--- man/cloud_storage_authenticate.Rd | 38 +-- man/download_cloud_file.Rd | 42 +-- man/expand_taxa.Rd | 27 +- man/flatten_field.Rd | 20 ++ man/flatten_row.Rd | 18 ++ man/get_preprocessed_surveys.Rd | 18 +- man/get_validated_surveys.Rd | 19 +- man/ingest_wcs_surveys.Rd | 29 +- man/kepler_mapper.Rd | 1 + man/preprocess_wcs_surveys.Rd | 27 +- man/pt_nest_attachments.Rd | 32 +- man/pt_nest_catch.Rd | 17 +- man/pt_nest_length.Rd | 17 +- man/pt_nest_market.Rd | 17 +- man/pt_nest_trip.Rd | 15 +- man/read_config.Rd | 1 + man/rename_child.Rd | 22 ++ man/retrieve_wcs_surveys.Rd | 35 +-- man/upload_cloud_file.Rd | 49 ++- man/validate_catch.Rd | 3 +- man/validate_length.Rd | 3 +- man/validate_market.Rd | 4 +- man/validate_surveys_time.Rd | 24 +- man/validate_wcs_surveys.Rd | 18 +- 49 files changed, 1292 insertions(+), 1380 deletions(-) delete mode 100644 R/cloud-storage.R delete mode 100644 R/get-cloud-files.R delete mode 100644 R/ingest-wcs-surveys.R create mode 100644 R/ingestion.R delete mode 100644 R/preprocess-wcs-surveys.R create mode 100644 R/preprocessing.R delete mode 100644 R/processing.R delete mode 100644 R/pt_nest_survey.R delete mode 100644 R/retrieve-wcs-data.R create mode 100644 R/storage.R delete mode 100644 R/validate-surveys.R rename R/{validation-functions.R => validation.R} (68%) create mode 100644 man/flatten_field.Rd create mode 100644 man/flatten_row.Rd create mode 100644 man/rename_child.Rd diff --git a/.github/workflows/pkgdown.yaml b/.github/workflows/pkgdown.yaml index ed7650c..4bbce75 100644 --- a/.github/workflows/pkgdown.yaml +++ b/.github/workflows/pkgdown.yaml @@ -9,7 +9,9 @@ on: types: [published] workflow_dispatch: -name: pkgdown +name: pkgdown.yaml + +permissions: read-all jobs: pkgdown: @@ -22,7 +24,7 @@ jobs: permissions: contents: write steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-pandoc@v2 @@ -41,7 +43,7 @@ jobs: - name: Deploy to GitHub pages 🚀 if: github.event_name != 'pull_request' - uses: JamesIves/github-pages-deploy-action@v4.4.1 + uses: JamesIves/github-pages-deploy-action@v4.5.0 with: clean: false branch: gh-pages diff --git a/.gitignore b/.gitignore index 8e5bf9e..141a1bf 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ docs *.xlsx *.csv *.tsv +*.parquet \ No newline at end of file diff --git a/DESCRIPTION b/DESCRIPTION index 23ef422..4510f21 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: peskas.zanzibar.data.pipeline Title: Functions to Implement the Zanzibar Small Scale Fisheries Data Pipeline -Version: 0.2.0 +Version: 1.0.0 Authors@R: c(person(given = "Lorenzo", family = "Longobardi", @@ -22,12 +22,12 @@ Imports: logger, magrittr, purrr, - readr, stringr, tidyr, rlang, lubridate, - taxize + taxize, + arrow Suggests: covr, pkgdown, diff --git a/Dockerfile b/Dockerfile index dcbeb54..9cbde94 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN install2.r --error --skipinstalled \ logger \ magrittr \ purrr \ - readr \ + arrow \ stringr \ tidyr \ lubridate \ @@ -30,9 +30,7 @@ RUN install2.r --error --skipinstalled \ univOutl \ taxize \ reticulate \ - stringi \ - taxize + stringi -#RUN Rscript -e "devtools::install_version('glmmTMB', version = '1.1.5')" # Rstudio interface preferences COPY rstudio-prefs.json /home/rstudio/.config/rstudio/rstudio-prefs.json diff --git a/Dockerfile.prod b/Dockerfile.prod index 6255f03..6956b52 100644 --- a/Dockerfile.prod +++ b/Dockerfile.prod @@ -23,7 +23,7 @@ RUN install2.r --error --skipinstalled \ logger \ magrittr \ purrr \ - readr \ + arrow \ stringr \ tidyr \ lubridate \ @@ -41,10 +41,8 @@ RUN install2.r --error --skipinstalled \ tidytext \ KoboconnectR \ univOutl \ - taxize \ reticulate \ - stringi \ - taxize + stringi # Install local package COPY . /home diff --git a/NAMESPACE b/NAMESPACE index ce3745b..2f4f151 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -5,6 +5,7 @@ export("%||%") export(":=") export(.data) export(add_version) +export(alert_outlier) export(as_label) export(as_name) export(cloud_object_name) @@ -14,6 +15,8 @@ export(enquo) export(enquos) export(expand_taxa) export(expr) +export(flatten_field) +export(flatten_row) export(get_preprocessed_surveys) export(get_validated_surveys) export(ingest_wcs_surveys) @@ -21,10 +24,10 @@ export(kepler_mapper) export(preprocess_wcs_surveys) export(pt_nest_attachments) export(pt_nest_catch) -export(pt_nest_length) export(pt_nest_market) export(pt_nest_trip) export(read_config) +export(rename_child) export(retrieve_wcs_surveys) export(sym) export(syms) @@ -34,27 +37,9 @@ export(validate_length) export(validate_market) export(validate_surveys_time) export(validate_wcs_surveys) -import(dplyr) -import(purrr) -import(stringr) -import(taxize) -import(tidyr) -importFrom(dplyr,case_when) -importFrom(dplyr,coalesce) -importFrom(dplyr,group_by) -importFrom(dplyr,left_join) -importFrom(dplyr,mutate) -importFrom(dplyr,select) -importFrom(dplyr,ungroup) -importFrom(logger,log_info) -importFrom(logger,log_threshold) -importFrom(lubridate,with_tz) importFrom(magrittr,"%<>%") importFrom(magrittr,"%>%") importFrom(magrittr,"%T>%") -importFrom(purrr,map) -importFrom(purrr,reduce) -importFrom(readr,write_rds) importFrom(rlang,"%||%") importFrom(rlang,":=") importFrom(rlang,.data) @@ -66,6 +51,4 @@ importFrom(rlang,expr) importFrom(rlang,sym) importFrom(rlang,syms) importFrom(stats,mad) -importFrom(tidyr,nest) -importFrom(tidyr,unnest) importFrom(utils,globalVariables) diff --git a/NEWS.md b/NEWS.md index f41d365..a19d07c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,13 @@ +# peskas.zanzibar.data.pipeline 1.0.0 + +#### Improvements +- All the functions are now documented and indexed according to keywords +- Thin out the R folder gathering functions by modules + +#### Changes +- Move to parquet format rather than CSV/RDS + + # peskas.zanzibar.data.pipeline 0.2.0 #### New features diff --git a/R/cloud-storage.R b/R/cloud-storage.R deleted file mode 100644 index e378aeb..0000000 --- a/R/cloud-storage.R +++ /dev/null @@ -1,257 +0,0 @@ -#' Authenticate to a storage cloud provider -#' -#' Usually used internally by other functions -#' -#' @param provider cloud provider to use, either "gcs" or "aws" -#' @param options named list with cloud provider options, see details -#' -#' @details -#' -#' ### Google Cloud Services -#' -#' For Google Cloud Services ("gcs") options must be a list with the field -#' `service_account_key` with the contents of the authentication json file you -#' have downloaded from your Google Project. -#' -#' This function uses [googleCloudStorageR::gcs_auth] under the hood to -#' authenticate. -#' -#' @export -#' -#' @examples -#' -#' # Google Cloud Services -#' \dontrun{ -#' authentication_details <- readLines("location_of_json_file.json") -#' cloud_storage_authenticate( -#' provider = "gcs", -#' options = list( -#' service_account_key = authentication_details, -#' bucket = "my-bucket" -#' ) -#' ) -#' } -cloud_storage_authenticate <- function(provider, options) { - if ("gcs" %in% provider) { - # Only need to authenticate if there is no token for downstream requests - if (isFALSE(googleAuthR::gar_has_token())) { - service_account_key <- options$service_account_key - temp_auth_file <- tempfile(fileext = "json") - writeLines(service_account_key, temp_auth_file) - googleCloudStorageR::gcs_auth(json_file = temp_auth_file) - } - } -} - -#' Upload a local file to a cloud storage bucket -#' -#' @param file a file-path (character) to upload. A vector with multiple files -#' is also supported. -#' @param name What to call the file once uploaded. Default is the filepath -#' @inheritParams cloud_storage_authenticate -#' @details -#' -#' ### Google Cloud Services -#' -#' For Google Cloud Services ("gcs") options must be a list with two fields: -#' `bucket` with the bucketname (character) you are uploading to, and -#' `service_account_key` with the contents of the authentication json file you -#' have downloaded from your Google Project (if [cloud_storage_authenticate] has -#' not been called before). -#' -#' This function uses [googleCloudStorageR::gcs_upload] under the hood to upload -#' the file. -#' -#' @return If `provider` is "gcs" and if successful a list of medatada objects -#' @export -#' -#' @examples -#' -#' # Google Cloud Services -#' \dontrun{ -#' authentication_details <- readLines("location_of_json_file.json") -#' upload_cloud_file( -#' file = "table_to_upload.csv", -#' provider = "gcs", -#' options = list( -#' service_account_key = authentication_details, -#' bucket = "my-bucket" -#' ) -#' ) -#' } -#' -upload_cloud_file <- function(file, provider, options, name = file) { - cloud_storage_authenticate(provider, options) - - out <- list() - if ("gcs" %in% provider) { - # Iterate over multiple files (and names) - google_output <- purrr::map2( - file, name, - ~ googleCloudStorageR::gcs_upload( - file = .x, - bucket = options$bucket, - name = .y, - predefinedAcl = "bucketLevel" - ) - ) - - out <- c(out, google_output) - } - - out -} - -#' Get the full name of a versioned cloud object -#' -#' Obtain the full name (e.g. -#' `timor-landings-v2_metadata__20210326084600_54617b3__.json`) of a cloud -#' storage object. If there are more than one object matching the prefix, -#' version, and extension, a vector with all the names is returned. -#' -#' @param prefix string indicating the prefix of the object -#' @param version either "latest" or the specific version string generated by -#' [add_version] when the file was uploaded to the cloud provider -#' @param extension extension of the desired file. Use an empty string "" to -#' return all extensions founds -#' @param exact_match logical indicating whether the prefix should be matched -#' exactly -#' @inheritParams cloud_storage_authenticate -#' -#' -#' @details -#' -#' ### Google Cloud Services -#' -#' For Google Cloud Services ("gcs") options must be a list with two fields: -#' `bucket` with the bucketname (character) you are uploading to, and -#' `service_account_key` with the contents of the authentication json file you -#' have downloaded from your Google Project (if [cloud_storage_authenticate] has -#' not been called before). -#' -#' This function uses [googleCloudStorageR::gcs_upload] under the hood to upload -#' the file. -#' -#' @return A string vector with the object names in the cloud storage that match -#' the prefix, the version, and the extension indicated in the parameters -#' @export -#' -#' @examples -#' -#' #' # Google Cloud Services -#' \dontrun{ -#' authentication_details <- readLines("location_of_json_file.json") -#' # obtain the latest version of all files corresponding to timor-landings-v2 -#' cloud_object_name( -#' prefix = "timor-landings-v2", -#' version = "latest", -#' provider = "gcs", -#' options = list( -#' service_account_key = authentication_details, -#' bucket = "my-bucket" -#' ) -#' ) -#' -#' # obtain a specific version of the structured data from timor-landings-v2 -#' cloud_object_name( -#' prefix = "timor-landings-v2_raw", -#' version = "20210326084600_54617b", -#' extension = "csv", -#' provider = "gcs", -#' options = list( -#' service_account_key = authentication_details, -#' bucket = "my-bucket" -#' ) -#' ) -#' } -#' -cloud_object_name <- function(prefix, version = "latest", extension = "", - provider, exact_match = FALSE, options) { - cloud_storage_authenticate(provider, options) - - if ("gcs" %in% provider) { - gcs_files <- googleCloudStorageR::gcs_list_objects( - bucket = options$bucket, - prefix = prefix - ) - - if (nrow(gcs_files) == 0) { - return(character(0)) - } - - gcs_files_formatted <- gcs_files %>% - tidyr::separate( - col = .data$name, - into = c("base_name", "version", "ext"), - # Version is separated with the "__" string - sep = "__", - remove = FALSE - ) %>% - dplyr::filter(stringr::str_detect(.data$ext, paste0(extension, "$"))) %>% - dplyr::group_by(.data$base_name, .data$ext) - - if (isTRUE(exact_match)) { - selected_rows <- gcs_files_formatted %>% - dplyr::filter(.data$base_name == prefix) - } else { - selected_rows <- gcs_files_formatted - } - - if (version == "latest") { - selected_rows <- selected_rows %>% - dplyr::filter(max(.data$updated) == .data$updated) - } else { - this_version <- version - selected_rows <- selected_rows %>% - dplyr::filter(.data$version == this_version) - } - - selected_rows$name - } -} - - -#' Download an object from a cloud storage bucket to a local file -#' -#' Download object from the cloud storage to a local file -#' -#' @param name the name of the object in the storage bucket. -#' @param file a file-path (character) where the object will be saved. Default -#' is the object name. -#' @inheritParams cloud_storage_authenticate -#' -#' -#' @return the file path -#' @export -#' -#' @examples -#' -#' # Google Cloud Services -#' \dontrun{ -#' authentication_details <- readLines("location_of_json_file.json") -#' download_cloud_file( -#' name = "timor-landings-v2_metadata__20210326084600_54617b3__.json", -#' provider = "gcs", -#' options = list( -#' service_account_key = authentication_details, -#' bucket = "my-bucket" -#' ) -#' ) -#' } -download_cloud_file <- function(name, provider, options, file = name) { - cloud_storage_authenticate(provider, options) - - if ("gcs" %in% provider) { - purrr::map2( - name, file, - ~ googleCloudStorageR::gcs_get_object( - object_name = .x, - bucket = options$bucket, - saveToDisk = .y, - overwrite = ifelse(is.null(options$overwrite), TRUE, options$overwrite) - ) - ) - } - - file -} diff --git a/R/get-cloud-files.R b/R/get-cloud-files.R deleted file mode 100644 index e861ba3..0000000 --- a/R/get-cloud-files.R +++ /dev/null @@ -1,58 +0,0 @@ -#' Download WCS preprocessed surveys -#' -#' Download preprocessed WCS data from Google Cloud. -#' -#' @param pars The configuration file. -#' -#' @return A rds dataframe of preprocessed survey landings. -#' @export -#' -get_preprocessed_surveys <- function(pars) { - wcs_preprocessed_surveys <- - cloud_object_name( - prefix = pars$surveys$wcs_surveys$preprocessed_surveys$file_prefix, - provider = pars$storage$google$key, - extension = "rds", - version = pars$surveys$wcs_surveys$version$preprocess, - options = pars$storage$google$options - ) - - logger::log_info("Retrieving {wcs_preprocessed_surveys}") - download_cloud_file( - name = wcs_preprocessed_surveys, - provider = pars$storage$google$key, - options = pars$storage$google$options - ) - - readr::read_rds(wcs_preprocessed_surveys) -} - - -#' Download WCS validated surveys -#' -#' Download validated WCS data from Google Cloud. -#' -#' @param pars The configuration file. -#' -#' @return A rds dataframe of validated survey landings. -#' @export -#' -get_validated_surveys <- function(pars) { - wcs_validated_surveys <- - cloud_object_name( - prefix = pars$surveys$wcs_surveys$validated_surveys$file_prefix, - provider = pars$storage$google$key, - extension = "rds", - version = pars$surveys$wcs_surveys$version$preprocess, - options = pars$storage$google$options - ) - - logger::log_info("Retrieving {wcs_validated_surveys}") - download_cloud_file( - name = wcs_validated_surveys, - provider = pars$storage$google$key, - options = pars$storage$google$options - ) - - readr::read_rds(wcs_validated_surveys) -} diff --git a/R/ingest-wcs-surveys.R b/R/ingest-wcs-surveys.R deleted file mode 100644 index 8e32473..0000000 --- a/R/ingest-wcs-surveys.R +++ /dev/null @@ -1,61 +0,0 @@ -#' Ingest WCS catch sruvey data -#' -#' Downloads landings information that has been collected using Kobo Toolbox and -#' uploads it to cloud storage services. -#' -#' This function downloads the survey data and uploads this information to cloud -#' services. File names used contain a -#' versioning string that includes the date-time and, if available, the first 7 -#' digits of the git commit sha. This is acomplished using [add_version()] -#' -#' The parameters needed in `conf.yml` are: -#' -#' ``` -#' surveys: -#' wcs_surveys: -#' asset_id: -#' username: -#' password: -#' file_prefix: -#' storage: -#' storage_name: -#' key: -#' options: -#' project: -#' bucket: -#' service_account_key: -#' ``` -#' -#' Progress through the function is tracked using the package *logger*. -#' -#' -#' @param log_threshold The (standard Apache logj4) log level used as a -#' threshold for the logging infrastructure. See [logger::log_levels] for more -#' details -#' -#' @keywords workflow -#' -#' @return No output. This function is used for it's side effects -#' @export -#' -ingest_wcs_surveys <- function(log_threshold = logger::DEBUG) { - logger::log_threshold(log_threshold) - - pars <- read_config() - - file_list <- retrieve_wcs_surveys( - prefix = pars$surveys$wcs_surveys$file_prefix, - file_format = "csv", - append_version = TRUE, - url = "kf.kobotoolbox.org", - project_id = pars$surveys$wcs_surveys$asset_id, - username = pars$surveys$wcs_surveys$username, - psswd = pars$surveys$wcs_surveys$password, - encoding = "UTF-8" - ) - - logger::log_info("Uploading files to cloud...") - # Iterate over multiple storage providers if there are more than one - purrr::map(pars$storage, ~ upload_cloud_file(file_list, .$key, .$options)) - logger::log_success("Files upload succeded") -} diff --git a/R/ingestion.R b/R/ingestion.R new file mode 100644 index 0000000..53afc41 --- /dev/null +++ b/R/ingestion.R @@ -0,0 +1,206 @@ +#' Ingest WCS Catch Survey Data +#' +#' This function automates the downloading of WCS fish catch survey data collected through Kobo Toolbox and uploads it to cloud storage services. The filenames are versioned to include date-time stamps and, if available, the first 7 digits of the Git commit SHA. +#' +#' Configuration parameters required from `conf.yml` include: +#' +#' ``` +#' surveys: +#' wcs_surveys: +#' asset_id: +#' username: +#' password: +#' file_prefix: +#' storage: +#' storage_name: +#' key: +#' options: +#' project: +#' bucket: +#' service_account_key: +#' ``` +#' +#' Progress through the function is tracked using the package *logger*. +#' +#' +#' Logging progress is managed using the `logger` package. +#' +#' @param log_threshold Log level used as the threshold for logging (see [logger::log_levels]). +#' @return None; the function is used for its side effects. +#' @export +#' @keywords workflow ingestion +#' @examples +#' \dontrun{ +#' ingest_wcs_surveys(logger::DEBUG) +#' } +#' +ingest_wcs_surveys <- function(log_threshold = logger::DEBUG) { + logger::log_threshold(log_threshold) + + pars <- read_config() + + file_list <- retrieve_wcs_surveys( + prefix = pars$surveys$wcs_surveys$file_prefix, + append_version = TRUE, + url = "kf.kobotoolbox.org", + project_id = pars$surveys$wcs_surveys$asset_id, + username = pars$surveys$wcs_surveys$username, + psswd = pars$surveys$wcs_surveys$password, + encoding = "UTF-8" + ) + + logger::log_info("Uploading files to cloud...") + # Iterate over multiple storage providers if there are more than one + purrr::map(pars$storage, ~ upload_cloud_file(file_list, .$key, .$options)) + logger::log_success("Files upload succeded") +} + + +#' Retrieve WCS Surveys from Kobotoolbox +#' +#' Downloads survey data from Kobotoolbox for a specified project and uploads the data in Parquet format. File naming can include versioning details. +#' +#' @param prefix Filename prefix or path for downloaded files. +#' @param append_version Boolean indicating whether to append versioning info to filenames. +#' @param url URL of the Kobotoolbox instance. +#' @param project_id Project asset ID for data download. +#' @param username Kobotoolbox account username. +#' @param psswd Kobotoolbox account password. +#' @param encoding Character encoding for the downloaded data; defaults to "UTF-8". +#' +#' @return Vector of paths for the downloaded Parquet files. +#' @export +#' @keywords ingestion +#' @examples +#' \dontrun{ +#' file_list <- retrieve_wcs_surveys( +#' prefix = "my_data", +#' append_version = TRUE, +#' url = "kf.kobotoolbox.org", +#' project_id = "my_project_id", +#' username = "admin", +#' psswd = "admin", +#' encoding = "UTF-8" +#' ) +#' } +#' +retrieve_wcs_surveys <- function( + prefix = NULL, + append_version = NULL, + url = NULL, + project_id = NULL, + username = NULL, + psswd = NULL, + encoding = NULL) { + logger::log_info("Downloading WCS Fish Catch Survey Kobo data...") + data_raw <- + KoboconnectR::kobotools_kpi_data( + url = url, + assetid = project_id, + uname = username, + pwd = psswd, + encoding = encoding + )$results + + # Check that submissions are unique in case there is overlap in the pagination + if (dplyr::n_distinct(purrr::map_dbl(data_raw, ~ .$`_id`)) != length(data_raw)) { + stop("Number of submission ids not the same as number of records") + } + + logger::log_info("Converting WCS Fish Catch Survey Kobo data to tabular format...") + tabular_data <- purrr::map_dfr(data_raw, flatten_row) + data_filename <- paste(prefix, "raw", sep = "_") + + if (isTRUE(append_version)) { + parquet_filename <- add_version(data_filename, "parquet") + } else { + parquet_filename <- paste0(data_filename, ".parquet") + } + + logger::log_info("Converting json data to Parquet as {parquet_filename}...") + + # Convert tabular_data to Arrow Table + arrow_table <- arrow::as_arrow_table(tabular_data) + + # Write to Parquet format + arrow::write_parquet( + arrow_table, + sink = parquet_filename, + compression = "lz4", + compression_level = 12 + ) + + return(parquet_filename) +} + +#' Flatten Survey Data Rows +#' +#' Transforms each row of nested survey data into a flat tabular format using a mapping and flattening process. +#' +#' @param x A list representing a row of data, potentially containing nested lists or vectors. +#' @return A tibble with each row representing flattened survey data. +#' @keywords internal +#' @export +flatten_row <- function(x) { + x %>% + # Each row is composed of several fields + purrr::imap(flatten_field) %>% + rlang::squash() %>% + tibble::as_tibble() +} + +#' Flatten Survey Data Fields +#' +#' Processes each field within a row of survey data, handling both simple vectors and nested lists. For lists with named elements, renames and unlists them for flat structure preparation. +#' +#' @param x A vector or list representing a field in the data. +#' @param p The prefix or name associated with the field, used for naming during the flattening process. +#' @return Modified field, either unchanged, unnested, or appropriately renamed. +#' @keywords internal +#' @export +flatten_field <- function(x, p) { + # If the field is a simple vector do nothing but if the field is a list we + # need more logic + if (inherits(x, "list")) { + if (length(x) > 0) { + if (purrr::vec_depth(x) == 2) { + # If the field-list has named elements is we just need to rename the list + x <- list(x) %>% + rlang::set_names(p) %>% + unlist() %>% + as.list() + } else { + # If the field-list is an "array" we need to iterate over its children + x <- purrr::imap(x, rename_child, p = p) + } + } + } else { + if (is.null(x)) x <- NA + } + x +} + +#' Rename Nested Survey Data Elements +#' +#' Appends a parent name or index to child elements within a nested list, assisting in creating a coherent and traceable data structure during the flattening process. +#' +#' @param x A list element, possibly nested, to be renamed. +#' @param i The index or key of the element within the parent list. +#' @param p The parent name to prepend to the element's existing name for context. +#' @return A renamed list element, structured to maintain contextual relevance in a flattened dataset. +#' @keywords internal +#' @export +rename_child <- function(x, i, p) { + if (length(x) == 0) { + if (is.null(x)) x <- NA + x <- list(x) + x <- rlang::set_names(x, paste(p, i - 1, sep = ".")) + } else { + if (inherits(i, "character")) { + x <- rlang::set_names(x, paste(p, i, sep = ".")) + } else if (inherits(i, "integer")) { + x <- rlang::set_names(x, paste(p, i - 1, names(x), sep = ".")) + } + } + x +} diff --git a/R/kepler-mapping.R b/R/kepler-mapping.R index 4ebfdcf..bf45d33 100644 --- a/R/kepler-mapping.R +++ b/R/kepler-mapping.R @@ -6,6 +6,7 @@ #' #' @param data_path Data to add to map. #' +#' @keywords visualization #' @return A self-contained map in html. #' @export #' diff --git a/R/preprocess-wcs-surveys.R b/R/preprocess-wcs-surveys.R deleted file mode 100644 index 128cb27..0000000 --- a/R/preprocess-wcs-surveys.R +++ /dev/null @@ -1,122 +0,0 @@ -#' Pre-process Zanzibar WCS surveys -#' -#' Downloads raw structured data from cloud storage services and pre-processes -#' into a binary format that is easier to deal with in R. During the pre-processing -#' phase, multiple columns in the survey data, which can become very wide due to -#' multiple recordings of similar information (e.g., species information), are nested -#' using a set of utility functions (`pt_nest_trip`, `pt_nest_catch`, -#' `pt_nest_length`, `pt_nest_market`, `pt_nest_attachments`). -#' -#' Nesting these columns helps in reducing the width of the dataframe and organizes -#' related columns into a single nested tibble column, thus simplifying subsequent -#' analysis and visualization tasks.#' -#' -#' This function downloads the landings data from a given version (specified in -#' the config file `conf.yml`.The parameters needed are: -#' -#' ``` -#' surveys: -#' wcs_surveys: -#' asset_id: -#' username: -#' password: -#' file_prefix: -#' version: -#' preprocess: -#' storage: -#' storage_name: -#' key: -#' options: -#' project: -#' bucket: -#' service_account_key: -#' ``` -#' -#' Progress through the function is tracked using the package *logger*. -#' -#' @inheritParams ingest_wcs_surveys -#' @keywords workflow -#' @return no outputs. This function is used for it's side effects -#' @seealso \code{\link[=pt_nest_trip]{pt_nest_trip}}, \code{\link[=pt_nest_catch]{pt_nest_catch}}, -#' \code{\link[=pt_nest_length]{pt_nest_length}}, \code{\link[=pt_nest_market]{pt_nest_market}}, -#' \code{\link[=pt_nest_attachments]{pt_nest_attachments}} -#' @export -#' -preprocess_wcs_surveys <- function(log_threshold = logger::DEBUG) { - logger::log_threshold(log_threshold) - - pars <- read_config() - - wcs_surveys_csv <- cloud_object_name( - prefix = pars$surveys$wcs_surveys$file_prefix, - provider = pars$storage$google$key, - extension = "csv", - version = pars$surveys$wcs_surveys$version$preprocess, - options = pars$storage$google$options - ) - - logger::log_info("Retrieving {wcs_surveys_csv}") - download_cloud_file( - name = wcs_surveys_csv, - provider = pars$storage$google$key, - options = pars$storage$google$options - ) - - catch_surveys_raw <- readr::read_csv( - file = wcs_surveys_csv, - col_types = readr::cols(.default = readr::col_character()) - ) - - other_info <- - catch_surveys_raw %>% - tidyr::separate(.data$gps, - into = c("lat", "lon", "drop1", "drop2"), - sep = " " - ) %>% - dplyr::select( - .data$`_id`, - .data$today, - .data$start, - .data$end, - .data$survey_real, - .data$survey_type, - .data$landing_site, - .data$lat, - .data$lon, - .data$trip_info, - .data$people, - .data$boats_landed - ) - - logger::log_info("Nesting survey groups' fields") - group_surveys <- - list( - survey_trip = pt_nest_trip(catch_surveys_raw), - other_info = other_info, - survey_catch = pt_nest_catch(catch_surveys_raw), - survey_length = pt_nest_length(catch_surveys_raw), - survey_market = pt_nest_market(catch_surveys_raw), - survey_attachments = pt_nest_attachments(catch_surveys_raw) - ) - - wcs_surveys_nested <- purrr::reduce( - group_surveys, - ~ dplyr::left_join(.x, .y, by = "_id") - ) - - preprocessed_filename <- pars$surveys$wcs_surveys$preprocessed_surveys$file_prefix %>% - add_version(extension = "rds") - - readr::write_rds( - x = wcs_surveys_nested, - file = preprocessed_filename, - compress = "gz" - ) - - logger::log_info("Uploading {preprocessed_filename} to cloud storage") - upload_cloud_file( - file = preprocessed_filename, - provider = pars$storage$google$key, - options = pars$storage$google$options - ) -} diff --git a/R/preprocessing.R b/R/preprocessing.R new file mode 100644 index 0000000..01045ef --- /dev/null +++ b/R/preprocessing.R @@ -0,0 +1,349 @@ +#' Pre-process Zanzibar WCS Surveys +#' +#' Downloads and preprocesses raw structured WCS survey data from cloud storage into a binary format. The process includes nesting multiple columns related to species information into single columns within a dataframe, which helps reduce its width and organize data efficiently for analysis. +#' +#' Configurations are read from `conf.yml` with the following necessary parameters: +#' +#' ``` +#' surveys: +#' wcs_surveys: +#' asset_id: +#' username: +#' password: +#' file_prefix: +#' version: +#' preprocess: +#' storage: +#' storage_name: +#' key: +#' options: +#' project: +#' bucket: +#' service_account_key: +#' ``` +#' +#' The function uses logging to track progress. +#' +#' @inheritParams ingest_wcs_surveys +#' @return None; the function is used for its side effects. +#' @export +#' @keywords workflow preprocessing +#' @seealso \code{\link[=pt_nest_trip]{pt_nest_trip}}, \code{\link[=pt_nest_catch]{pt_nest_catch}}, +#' \code{\link[=pt_nest_length]{pt_nest_length}}, \code{\link[=pt_nest_market]{pt_nest_market}}, +#' \code{\link[=pt_nest_attachments]{pt_nest_attachments}} +#' +preprocess_wcs_surveys <- function(log_threshold = logger::DEBUG) { + logger::log_threshold(log_threshold) + + pars <- read_config() + + wcs_surveys_parquet <- cloud_object_name( + prefix = pars$surveys$wcs_surveys$file_prefix, + provider = pars$storage$google$key, + extension = "parquet", + version = pars$surveys$wcs_surveys$version$preprocess, + options = pars$storage$google$options + ) + + logger::log_info("Retrieving {wcs_surveys_parquet}") + download_cloud_file( + name = wcs_surveys_parquet, + provider = pars$storage$google$key, + options = pars$storage$google$options + ) + + catch_surveys_raw <- arrow::read_parquet( + file = wcs_surveys_parquet, + ) + + other_info <- + catch_surveys_raw %>% + tidyr::separate(.data$gps, + into = c("lat", "lon", "drop1", "drop2"), + sep = " " + ) %>% + dplyr::select( + .data$`_id`, + .data$today, + .data$start, + .data$end, + .data$survey_real, + .data$survey_type, + .data$landing_site, + .data$lat, + .data$lon, + .data$trip_info, + .data$people, + .data$boats_landed + ) + + logger::log_info("Nesting survey groups' fields") + group_surveys <- + list( + survey_trip = pt_nest_trip(catch_surveys_raw), + other_info = other_info, + survey_catch = pt_nest_catch(catch_surveys_raw), + survey_length = pt_nest_length(catch_surveys_raw), + survey_market = pt_nest_market(catch_surveys_raw), + survey_attachments = pt_nest_attachments(catch_surveys_raw) + ) + + wcs_surveys_nested <- purrr::reduce( + group_surveys, + ~ dplyr::full_join(.x, .y, by = "_id") + ) + + preprocessed_filename <- pars$surveys$wcs_surveys$preprocessed_surveys$file_prefix %>% + add_version(extension = "parquet") + + arrow::write_parquet( + x = wcs_surveys_nested, + sink = preprocessed_filename, + compression = "lz4", + compression_level = 12 + ) + + logger::log_info("Uploading {preprocessed_filename} to cloud storage") + upload_cloud_file( + file = preprocessed_filename, + provider = pars$storage$google$key, + options = pars$storage$google$options + ) +} + + +#' Nest Length Group Columns +#' +#' Nests length group columns obtained from the structured data of WCS landings surveys. +#' This reduces the width of data by converting multiple related columns into a single nested column. +#' +#' @param x Data frame of WCS survey data in tabular format. +#' @return A data frame with length data nested into a single 'length' column, +#' which contains a tibble for each row with multiple measurements. +#' @keywords internal +#' +pt_nest_length <- function(x) { + x %>% + dplyr::select(.data$`_id`, dplyr::starts_with("Length_Frequency_Survey")) %>% + tidyr::pivot_longer(-c(.data$`_id`)) %>% + dplyr::mutate( + n = stringr::str_extract(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, pattern = "Length_Frequency_Survey/catch_length..Length_Frequency_Survey/catch_length/") + ) %>% + tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% + dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% + dplyr::filter(.data$n == 0 | !is.na(.data$content)) %>% + dplyr::select(-.data$content) %>% + tidyr::nest( + "length" = c( + .data$family, .data$species, + .data$sex, .data$total_length + ), + .by = .data$`_id` + ) +} + +#' Nest Market Group Columns +#' +#' Nests market group columns from structured WCS landings survey data. This method organizes +#' multiple related market data points into a single nested 'market' column per row. +#' +#' @param x Data frame of WCS survey data in tabular format. +#' @return A data frame with market data nested into a 'market' column, containing a tibble +#' for each row with various market-related attributes. +#' @keywords internal +#' @export +#' +pt_nest_market <- function(x) { + x %>% + dplyr::select(.data$`_id`, dplyr::starts_with("Market_Catch_Survey")) %>% + tidyr::pivot_longer(-c(.data$`_id`)) %>% + dplyr::mutate( + n = stringr::str_extract(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, pattern = "Market_Catch_Survey/catch_market..Market_Catch_Survey/catch_market/") + ) %>% + tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% + dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% + dplyr::filter(.data$n == 0 | !is.na(.data$content)) %>% + dplyr::select(-.data$content) %>% + tidyr::nest( + "market" = c( + .data$group_market, .data$species_market, + .data$weight_market, .data$price_sold_for + ), + .by = .data$`_id` + ) +} + +#' Nest Catch Group Columns +#' +#' Nests catch group columns from WCS structured survey data to organize multiple +#' related catch data points into a single nested 'catch' column per row. +#' +#' @param x Data frame of WCS survey data in tabular format. +#' @return A data frame with catch data nested into a 'catch' column, containing a tibble +#' for each row with various catch-related attributes. +#' @keywords internal +#' @export +#' +pt_nest_catch <- function(x) { + x %>% + dplyr::select(.data$`_id`, dplyr::starts_with("Total_Catch_Survey")) %>% + tidyr::pivot_longer(-c(.data$`_id`)) %>% + dplyr::mutate( + n = stringr::str_extract(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, pattern = "Total_Catch_Survey/catch_catch..Total_Catch_Survey/catch_catch/") + ) %>% + tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% + dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% + dplyr::filter(.data$n == 0 | !is.na(.data$content)) %>% + dplyr::mutate( + weight_kg = dplyr::coalesce(.data$weight_catch, .data$wgt_ind_catch, .data$wgt_buckets_catch), + nb_elements = dplyr::coalesce(.data$nb_ind_catch, .data$nb_buckets_catch) + ) %>% + dplyr::select(-c( + .data$content, .data$weight_catch, .data$wgt_ind_catch, .data$wgt_buckets_catch, + .data$nb_ind_catch, .data$nb_buckets_catch + )) %>% + tidyr::nest( + "catch" = c( + .data$type_measure, .data$All_catch_in_boat, .data$group_catch, + .data$species_catch, .data$nb_elements, .data$weight_kg + ), + .by = .data$`_id` + ) +} + +#' Nest Trip Group Columns +#' +#' Processes and nests trip-related columns from structured WCS landings survey data into a single 'trip' column. This approach consolidates trip information into nested tibbles within the dataframe, simplifying the structure for analysis. +#' +#' @param x A data frame containing structured survey data in tabular format. +#' @return A data frame with trip data nested into a single 'trip' column containing a tibble for each row, corresponding to the various trip details. +#' @keywords internal +#' @export +#' +pt_nest_trip <- function(x) { + x %>% + dplyr::select(.data$`_id`, dplyr::starts_with("Fishing_Trip")) %>% + tidyr::pivot_longer(-c(.data$`_id`)) %>% + dplyr::mutate( + n = stringr::str_extract(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, "(\\d+)"), + name = stringr::str_remove(.data$name, pattern = "Fishing_Trip/") + ) %>% + tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% + dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% + # dplyr::filter(!is.na(.data$content)) %>% + dplyr::select(-.data$content, -.data$n) +} + +#' Nest Attachment Columns +#' +#' Nests attachment-related columns from structured WCS survey data, organizing multiple attachment entries into a single nested column. This function addresses the challenge of handling wide data tables by converting them into more manageable nested data frames. +#' +#' @param x A data frame containing raw survey data, potentially with multiple attachments per survey entry. +#' @return A data frame with attachment information nested into a single '_attachments' column, containing a tibble for each row. +#' @keywords internal +#' @export +#' +#' @examples +#' \dontrun{ +#' dummy_landings <- tidyr::tibble( +#' `_id` = "123", +#' `_attachments.0.download_url` = "http://url-1.com", +#' `_attachments.0.id` = "01", +#' `_attachments.1.download_url` = "http://url-2.com", +#' `_attachments.1.id` = "02", +#' `_attachments.2.download_url` = NA, +#' `_attachments.2.id` = NA +#' ) +#' pt_nest_attachments(dummy_landings) +#' } +#' +pt_nest_attachments <- function(x) { + x %>% + # Using the .data pronoun to avoid RMD check notes + dplyr::select(.data$`_id`, dplyr::starts_with("_attachments")) %>% + dplyr::mutate_all(as.character) %>% + # Column names follow the form "_attachments.0.download_large_url" + tidyr::pivot_longer( + cols = -.data$`_id`, + names_to = c("n", "field"), + names_prefix = "_attachments.", + names_sep = "\\." + ) %>% + # We want one attachment per row and fields as columns + tidyr::pivot_wider(names_from = "field", values_from = "value") %>% + # Attachments already have id and this column is superfluous + dplyr::select(-.data$n) %>% + dplyr::group_by(.data$`_id`) %>% + tidyr::nest() %>% + dplyr::ungroup() %>% + dplyr::rename("_attachments" = "data") %>% + # If there are no attachments empty the nested data frames + dplyr::mutate( + `_id` = as.integer(.data$`_id`), + `_attachments` = purrr::map( + .data$`_attachments`, + ~ dplyr::filter(., !is.na(.data$id)) + ) + ) +} + + +#' Expand Taxonomic Vectors into a Data Frame +#' +#' Converts a vector of species identifiers into a detailed data frame containing taxonomic classification. Each identifier should follow the format 'family_genus_species', which is expanded to include comprehensive taxonomic details. +#' +#' @param data A vector of species identifiers formatted as 'family_genus_species'. If not provided, the function will return an error. +#' @return A data frame where each row corresponds to a species, enriched with taxonomic classification information including family, genus, species, and additional taxonomic ranks. +#' @keywords data transformation +#' @export +#' @examples +#' \dontrun{ +#' species_vector <- c("lutjanidae_lutjanus_spp", "scaridae_spp", "acanthuridae_naso_hexacanthus") +#' expanded_data <- expand_taxa(species_vector) +#' } +#' @details This function splits each species identifier into its constituent parts, replaces underscores with spaces for readability, and retrieves taxonomic classification from the GBIF database using the `taxize` package. +#' @note Requires internet access to fetch data from the GBIF database. The accuracy of results depends on the correct formatting of input data and the availability of taxonomic data in the GBIF database. +#' +expand_taxa <- function(data = NULL) { + taxa_expanded <- + data %>% + dplyr::mutate(species_list = stringr::str_split(.data$species_catch, pattern = " ")) %>% + tidyr::unnest(.data$species_list) %>% + dplyr::mutate( + species_list = stringr::str_replace(.data$species_list, pattern = "_", replacement = " "), + species_list = stringr::str_replace(.data$species_list, pattern = "_", replacement = " "), + words = stringi::stri_count_words(.data$species_list), + genus_species = dplyr::case_when( + .data$words == 3 ~ stringr::str_extract(.data$species_list, "\\S+\\s+\\S+$"), + TRUE ~ NA_character_ + ), + species_list = ifelse(.data$words == 3, NA_character_, .data$species_list), + catch_group = dplyr::coalesce(.data$species_list, .data$genus_species), + catch_group = stringr::str_replace(.data$catch_group, pattern = " spp.", replacement = ""), + catch_group = stringr::str_replace(.data$catch_group, pattern = " spp", replacement = ""), + catch_group = stringr::str_replace(.data$catch_group, pattern = "_spp", replacement = ""), + catch_group = ifelse(.data$catch_group == "acanthocybium solandiri", "acanthocybium solandri", .data$catch_group), + catch_group = ifelse(.data$catch_group == "panaeidae", "penaeidae", .data$catch_group), + catch_group = ifelse(.data$catch_group == "mulidae", "mullidae", .data$catch_group), + catch_group = ifelse(.data$catch_group == "casio xanthonotus", "caesio xanthonotus", .data$catch_group), + ) %>% + dplyr::select(-c(.data$species_list, .data$genus_species, .data$words)) + + groups_rank <- + taxize::classification(unique(taxa_expanded$catch_group), db = "gbif", rows = 1) %>% + purrr::imap(~ .x %>% + dplyr::as_tibble() %>% + dplyr::mutate(catch_group = .y)) %>% + dplyr::bind_rows() %>% + tidyr::pivot_wider(id_cols = .data$catch_group, names_from = .data$rank, values_from = .data$name) %>% + dplyr::select(-c(.data$class, .data$`NA`)) + + dplyr::left_join(taxa_expanded, groups_rank, by = "catch_group") +} diff --git a/R/processing.R b/R/processing.R deleted file mode 100644 index a8b061f..0000000 --- a/R/processing.R +++ /dev/null @@ -1,69 +0,0 @@ -#' Expand Taxonomic Vectors into a Data Frame -#' -#' @description This function takes a vector of species identifiers and expands it into a -#' data frame that includes taxonomic classification information for each species. -#' Each species identifier is expected to follow the format 'family_genus_species'. -#' -#' @param data A vector of species identifiers where each entry is a string with -#' the format 'family_genus_species'. If not supplied, the function will throw an error. -#' -#' @return A data frame where each row corresponds to a unique species from the input data. -#' The data frame includes columns for species catch, number of elements, weight, and taxonomic -#' classification including catch group, kingdom, phylum, order, family, genus and species. -#' -#' @examples -#' \dontrun{ -#' species_vector <- c("lutjanidae_lutjanus_spp", "scaridae_spp", "acanthuridae_naso_hexacanthus") -#' expanded_data <- expand_taxa(species_vector) -#' } -#' @export -#' -#' @details The function first splits the species identifier into a list of species, -#' replaces underscores with spaces, and counts the number of words to help with further -#' classification. It then uses conditional logic to extract genus and species names, -#' handle special cases, and clean up group names. The cleaned data is then used to fetch -#' taxonomic classification from the GBIF database using the `taxize` package. The final -#' data frame is constructed by joining the expanded species data with the taxonomic ranks. -#' -#' @import dplyr tidyr stringr taxize purrr -#' -#' @note The function requires internet access to fetch data from the GBIF database. -#' It also assumes that the input vector is properly formatted. -#' If there are any formatting issues or the GBIF database does not recognize a species, -#' the function may return unexpected results or throw an error. -expand_taxa <- function(data = NULL) { - taxa_expanded <- - data %>% - dplyr::mutate(species_list = stringr::str_split(.data$species_catch, pattern = " ")) %>% - tidyr::unnest(.data$species_list) %>% - dplyr::mutate( - species_list = stringr::str_replace(.data$species_list, pattern = "_", replacement = " "), - species_list = stringr::str_replace(.data$species_list, pattern = "_", replacement = " "), - words = stringi::stri_count_words(.data$species_list), - genus_species = dplyr::case_when( - .data$words == 3 ~ stringr::str_extract(.data$species_list, "\\S+\\s+\\S+$"), - TRUE ~ NA_character_ - ), - species_list = ifelse(.data$words == 3, NA_character_, .data$species_list), - catch_group = dplyr::coalesce(.data$species_list, .data$genus_species), - catch_group = stringr::str_replace(.data$catch_group, pattern = " spp.", replacement = ""), - catch_group = stringr::str_replace(.data$catch_group, pattern = " spp", replacement = ""), - catch_group = stringr::str_replace(.data$catch_group, pattern = "_spp", replacement = ""), - catch_group = ifelse(.data$catch_group == "acanthocybium solandiri", "acanthocybium solandri", .data$catch_group), - catch_group = ifelse(.data$catch_group == "panaeidae", "penaeidae", .data$catch_group), - catch_group = ifelse(.data$catch_group == "mulidae", "mullidae", .data$catch_group), - catch_group = ifelse(.data$catch_group == "casio xanthonotus", "caesio xanthonotus", .data$catch_group), - ) %>% - dplyr::select(-c(.data$species_list, .data$genus_species, .data$words)) - - groups_rank <- - taxize::classification(unique(taxa_expanded$catch_group), db = "gbif", rows = 1) %>% - purrr::imap(~ .x %>% - dplyr::as_tibble() %>% - dplyr::mutate(catch_group = .y)) %>% - dplyr::bind_rows() %>% - tidyr::pivot_wider(id_cols = .data$catch_group, names_from = .data$rank, values_from = .data$name) %>% - dplyr::select(-c(.data$class, .data$`NA`)) - - dplyr::left_join(taxa_expanded, groups_rank, by = "catch_group") -} diff --git a/R/pt_nest_survey.R b/R/pt_nest_survey.R deleted file mode 100644 index e793571..0000000 --- a/R/pt_nest_survey.R +++ /dev/null @@ -1,204 +0,0 @@ -#' Nest length group columns -#' -#' Nests length group columns obtained when reading structured data from the kobo -#' landings survey. -#' -#' @param x Kobo survey in tabular format. -#' -#' @return Nested Landings data in which the information about multiple length information has -#' been nested into a single column (`length`) this column contains a -#' tibble for every row. This, attachment tibble has as many rows as there are -#' length information -#' @export -#' -pt_nest_length <- function(x) { - x %>% - dplyr::select(.data$`_id`, dplyr::starts_with("Length_Frequency_Survey")) %>% - tidyr::pivot_longer(-c(.data$`_id`)) %>% - dplyr::mutate( - n = stringr::str_extract(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, pattern = "Length_Frequency_Survey/catch_length..Length_Frequency_Survey/catch_length/") - ) %>% - tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% - dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% - dplyr::filter(.data$n == 0 | !is.na(.data$content)) %>% - dplyr::select(-.data$content) %>% - tidyr::nest( - "length" = c( - .data$family, .data$species, - .data$sex, .data$total_length - ), - .by = .data$`_id` - ) -} - -#' Nest market group columns -#' -#' Nests market group columns obtained when reading structured data from the kobo -#' landings survey. -#' -#' @param x Kobo survey in tabular format. -#' -#' @return Nested Landings data in which the information about multiple market information has -#' been nested into a single column (`market`) this column contains a -#' tibble for every row. This, attachment tibble has as many rows as there are -#' market information. -#' @export -#' -pt_nest_market <- function(x) { - x %>% - dplyr::select(.data$`_id`, dplyr::starts_with("Market_Catch_Survey")) %>% - tidyr::pivot_longer(-c(.data$`_id`)) %>% - dplyr::mutate( - n = stringr::str_extract(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, pattern = "Market_Catch_Survey/catch_market..Market_Catch_Survey/catch_market/") - ) %>% - tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% - dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% - dplyr::filter(.data$n == 0 | !is.na(.data$content)) %>% - dplyr::select(-.data$content) %>% - tidyr::nest( - "market" = c( - .data$group_market, .data$species_market, - .data$weight_market, .data$price_sold_for - ), - .by = .data$`_id` - ) -} - -#' Nest catch catch columns -#' -#' Nests catch group columns obtained when reading structured data from the kobo -#' landings survey. -#' -#' @param x Kobo survey in tabular format. -#' -#' @return Nested Landings data in which the information about multiple catch information has -#' been nested into a single column (`catch`) this column contains a -#' tibble for every row. This, attachment tibble has as many rows as there are -#' catch information. -#' @export -#' -pt_nest_catch <- function(x) { - x %>% - dplyr::select(.data$`_id`, dplyr::starts_with("Total_Catch_Survey")) %>% - tidyr::pivot_longer(-c(.data$`_id`)) %>% - dplyr::mutate( - n = stringr::str_extract(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, pattern = "Total_Catch_Survey/catch_catch..Total_Catch_Survey/catch_catch/") - ) %>% - tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% - dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% - dplyr::filter(.data$n == 0 | !is.na(.data$content)) %>% - dplyr::mutate( - weight_kg = dplyr::coalesce(.data$weight_catch, .data$wgt_ind_catch, .data$wgt_buckets_catch), - nb_elements = dplyr::coalesce(.data$nb_ind_catch, .data$nb_buckets_catch) - ) %>% - dplyr::select(-c( - .data$content, .data$weight_catch, .data$wgt_ind_catch, .data$wgt_buckets_catch, - .data$nb_ind_catch, .data$nb_buckets_catch - )) %>% - tidyr::nest( - "catch" = c( - .data$type_measure, .data$All_catch_in_boat, .data$group_catch, - .data$species_catch, .data$nb_elements, .data$weight_kg - ), - .by = .data$`_id` - ) -} - -#' Nest trip catch columns -#' -#' Nests trip group columns obtained when reading structured data from the kobo -#' landings survey. -#' -#' @param x Kobo survey in tabular format. -#' -#' @return Nested Landings data in which the information about multiple trip information has -#' been nested into a single column (`trip`) this column contains a -#' tibble for every row. This, attachment tibble has as many rows as there are -#' trip information. -#' @export -pt_nest_trip <- function(x) { - x %>% - dplyr::select(.data$`_id`, dplyr::starts_with("Fishing_Trip")) %>% - tidyr::pivot_longer(-c(.data$`_id`)) %>% - dplyr::mutate( - n = stringr::str_extract(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, "(\\d+)"), - name = stringr::str_remove(.data$name, pattern = "Fishing_Trip/") - ) %>% - tidyr::pivot_wider(names_from = .data$name, values_from = .data$value) %>% - dplyr::mutate(content = dplyr::coalesce(!!!.[3:ncol(.)])) %>% - # dplyr::filter(!is.na(.data$content)) %>% - dplyr::select(-.data$content, -.data$n) -} - -#' Nest attachment columns -#' -#' Nests attachment columns obtained when reading structured data from the kobo -#' landings survey -#' -#' One of the disadvantages of using structured survey data is that the tables -#' can become very wide (many columns). This happens when question groups or -#' other fields can be recorded multiple times. For example in the landings -#' survey, for each species captured, about 17 questions are recorded. There is -#' no limit to the number of species that can be recorded in the trip. If, for -#' example a survey records seven species we will have over a hundred columns in -#' the data corresponding to species information. -#' -#' To improve that situation an avoid using multiple tables we use **nested data -#' frames** (see [tidyr::nest]). In nested data frames columns can be lists and -#' can contain arbitrary information, like other data frames, lists, vectors, or -#' models. -#' -#' @param x A data frame containing raw landings data from the Timor operations. -#' -#' @return Landings data in which the information about multiple attachments has -#' been nested into a single column (`_attachments`) this column contains a -#' tibble for every row. This, attachment tibble has as many rows as there are -#' attachments. -#' -#' @export -#' @importFrom rlang .data -#' -#' @examples -#' dummy_landings <- tidyr::tibble( -#' `_id` = "123", -#' `_attachments.0.download_url` = "http://url-1.com", -#' `_attachments.0.id` = "01", -#' `_attachments.1.download_url` = "http://url-2.com", -#' `_attachments.1.id` = "02", -#' `_attachments.2.download_url` = NA, -#' `_attachments.2.id` = NA -#' ) -#' pt_nest_attachments(dummy_landings) -pt_nest_attachments <- function(x) { - x %>% - # Using the .data pronoun to avoid RMD check notes - dplyr::select(.data$`_id`, dplyr::starts_with("_attachments")) %>% - dplyr::mutate_all(as.character) %>% - # Column names follow the form "_attachments.0.download_large_url" - tidyr::pivot_longer( - cols = -.data$`_id`, - names_to = c("n", "field"), - names_prefix = "_attachments.", - names_sep = "\\." - ) %>% - # We want one attachment per row and fields as columns - tidyr::pivot_wider(names_from = "field", values_from = "value") %>% - # Attachments already have id and this column is superfluous - dplyr::select(-.data$n) %>% - dplyr::group_by(.data$`_id`) %>% - tidyr::nest() %>% - dplyr::ungroup() %>% - dplyr::rename("_attachments" = "data") %>% - # If there are no attachments empty the nested data frames - dplyr::mutate(`_attachments` = purrr::map( - .data$`_attachments`, - ~ dplyr::filter(., !is.na(.data$id)) - )) -} diff --git a/R/retrieve-wcs-data.R b/R/retrieve-wcs-data.R deleted file mode 100644 index 703222b..0000000 --- a/R/retrieve-wcs-data.R +++ /dev/null @@ -1,128 +0,0 @@ -#' Download WCS Surveys from Kobotoolbox -#' -#' This function retrieves survey data from Kobotoolbox for a specific project. -#' It allows users to customize the filename using a prefix, choose between CSV or RDS formats, -#' and decide whether to append versioning information to the filename. -#' The resulting files are downloaded to the working directory or specified path, -#' with paths returned as a character vector. -#' -#' @param prefix Name to be used as the prefix of the file names to be -#' downloaded. Can be a path. -#' @param file_format Either "csv" or "rds", -#' @param append_version Whether to append versioning information to the -#' filename using \link{add_version}.. -#' @param url The URL of kobotoolbox (often referred to as 'kpi-url'). -#' @param project_id Is the asset id of the asset for which the data is -#' to be downloaded. -#' @param username Username of your kobotoolbox account. -#' @param psswd Password of the account. -#' @param encoding Encoding to be used. Default is "UTF-8". -#' -#' @return A character vector with paths of the downloaded files. -#' @export -#' @examples -#' \dontrun{ -#' file_list <- retrieve_wcs_surveys( -#' prefix = "my_data", -#' file_format = "csv", -#' append_version = TRUE, -#' url = "kf.kobotoolbox.org", -#' project_id = "my_project_id", -#' username = "admin", -#' psswd = "admin", -#' encoding = "UTF-8" -#' ) -#' } -retrieve_wcs_surveys <- function(prefix = NULL, - file_format = NULL, - append_version = NULL, - url = NULL, - project_id = NULL, - username = NULL, - psswd = NULL, - encoding = NULL) { - logger::log_info("Downloading WCS Fish Catch Survey Kobo data...") - data_raw <- - KoboconnectR::kobotools_kpi_data( - url = url, - assetid = project_id, - uname = username, - pwd = psswd, - encoding = encoding - )$results - - # Check that submissions are unique in case there is overlap in the pagination - if (dplyr::n_distinct(purrr::map_dbl(data_raw, ~ .$`_id`)) != length(data_raw)) { - stop("Number of submission ids not the same as number of records") - } - - logger::log_info("Converting WCS Fish Catch Survey Kobo data to tabular format...") - tabular_data <- purrr::map_dfr(data_raw, flatten_row) - data_filename <- paste(prefix, "raw", sep = "_") - - if (isTRUE(append_version)) { - csv_filename <- add_version(data_filename, "csv") - rds_filename <- add_version(data_filename, "rds") - } - - filenames <- character() - if ("csv" %in% file_format) { - logger::log_info("Converting json data to CSV as {csv_filename}...") - readr::write_csv(tabular_data, csv_filename) - filenames <- c(filenames, csv_filename) - } - - if ("rds" %in% file_format) { - logger::log_info("Converting json data to RDS as {rds_filename}...") - readr::write_rds(tabular_data, rds_filename) - filenames <- c(filenames, rds_filename) - } - filenames -} - - -flatten_row <- function(x) { - x %>% - # Each row is composed of several fields - purrr::imap(flatten_field) %>% - rlang::squash() %>% - tibble::as_tibble() -} - -flatten_field <- function(x, p) { - # If the field is a simple vector do nothing but if the field is a list we - # need more logic - if (inherits(x, "list")) { - if (length(x) > 0) { - if (purrr::vec_depth(x) == 2) { - # If the field-list has named elements is we just need to rename the list - x <- list(x) %>% - rlang::set_names(p) %>% - unlist() %>% - as.list() - } else { - # If the field-list is an "array" we need to iterate over its children - x <- purrr::imap(x, rename_child, p = p) - } - } - } else { - if (is.null(x)) x <- NA - } - x -} - -# Appends parent name or number to element -rename_child <- function(x, i, p) { - if (length(x) == 0) { - if (is.null(x)) x <- NA - x <- list(x) - x <- rlang::set_names(x, paste(p, i - 1, sep = ".")) - } else { - if (inherits(i, "character")) { - x <- rlang::set_names(x, paste(p, i, sep = ".")) - } else if (inherits(i, "integer")) { - x <- rlang::set_names(x, paste(p, i - 1, names(x), sep = ".")) - } - } - x -} diff --git a/R/storage.R b/R/storage.R new file mode 100644 index 0000000..f9e3f91 --- /dev/null +++ b/R/storage.R @@ -0,0 +1,269 @@ +#' Authenticate to a Cloud Storage Provider +#' +#' This function is primarily used internally by other functions to establish authentication +#' with specified cloud providers such as Google Cloud Services (GCS) or Amazon Web Services (AWS). +#' +#' @param provider A character string specifying the cloud provider ("gcs" or "aws"). +#' @param options A named list of options specific to the cloud provider (see details). +#' +#' @details For GCS, the options list must include: +#' - `service_account_key`: The contents of the authentication JSON file from your Google Project. +#' +#' This function wraps [googleCloudStorageR::gcs_auth()] to handle GCS authentication. +#' +#' @export +#' @keywords storage +#' @examples +#' \dontrun{ +#' authentication_details <- readLines("path/to/json_file.json") +#' cloud_storage_authenticate("gcs", list(service_account_key = authentication_details)) +#' #' +#' } +cloud_storage_authenticate <- function(provider, options) { + if ("gcs" %in% provider) { + # Only need to authenticate if there is no token for downstream requests + if (isFALSE(googleAuthR::gar_has_token())) { + service_account_key <- options$service_account_key + temp_auth_file <- tempfile(fileext = "json") + writeLines(service_account_key, temp_auth_file) + googleCloudStorageR::gcs_auth(json_file = temp_auth_file) + } + } +} + +#' Upload File to Cloud Storage +#' +#' Uploads a local file to a specified cloud storage bucket, supporting both single and multiple files. +#' +#' @param file A character vector specifying the path(s) of the file(s) to upload. +#' @param provider A character string specifying the cloud provider ("gcs" or "aws"). +#' @param options A named list of provider-specific options including the bucket and authentication details. +#' @param name (Optional) The name to assign to the file in the cloud. If not specified, the local file name is used. +#' +#' @details For GCS, the options list must include: +#' - `bucket`: The name of the bucket to which files are uploaded. +#' - `service_account_key`: The authentication JSON contents, if not previously authenticated. +#' +#' This function utilizes [googleCloudStorageR::gcs_upload()] for file uploads to GCS. +#' +#' @return A list of metadata objects for the uploaded files if successful. +#' @export +#' @keywords storage +#' @examples +#' \dontrun{ +#' authentication_details <- readLines("path/to/json_file.json") +#' upload_cloud_file("path/to/local_file.csv", +#' "gcs", +#' list(service_account_key = authentication_details, bucket = "my-bucket")) +#' } +#' +upload_cloud_file <- function(file, provider, options, name = file) { + cloud_storage_authenticate(provider, options) + + out <- list() + if ("gcs" %in% provider) { + # Iterate over multiple files (and names) + google_output <- purrr::map2( + file, name, + ~ googleCloudStorageR::gcs_upload( + file = .x, + bucket = options$bucket, + name = .y, + predefinedAcl = "bucketLevel" + ) + ) + + out <- c(out, google_output) + } + + out +} + +#' Retrieve Full Name of Versioned Cloud Object +#' +#' Gets the full name(s) of object(s) in cloud storage matching the specified prefix, version, and file extension. +#' +#' @param prefix A string indicating the object's prefix. +#' @param version A string specifying the version ("latest" or a specific version string). +#' @param extension The file extension to filter by. An empty string ("") includes all extensions. +#' @param provider A character string specifying the cloud provider ("gcs" or "aws"). +#' @param exact_match A logical indicating whether to match the prefix exactly. +#' @param options A named list of provider-specific options including the bucket and authentication details. +#' +#' @details For GCS, the options list should include: +#' - `bucket`: The bucket name. +#' - `service_account_key`: The authentication JSON contents, if not previously authenticated. +#' +#' @return A vector of names of objects matching the criteria. +#' @export +#' @keywords storage +#' @examples +#' \dontrun{ +#' authentication_details <- readLines("path/to/json_file.json") +#' cloud_object_name("prefix", +#' "latest", +#' "json", +#' "gcs", +#' list(service_account_key = authentication_details, bucket = "my-bucket")) +#' #' +#' } +cloud_object_name <- function(prefix, version = "latest", extension = "", + provider, exact_match = FALSE, options) { + cloud_storage_authenticate(provider, options) + + if ("gcs" %in% provider) { + gcs_files <- googleCloudStorageR::gcs_list_objects( + bucket = options$bucket, + prefix = prefix + ) + + if (nrow(gcs_files) == 0) { + return(character(0)) + } + + gcs_files_formatted <- gcs_files %>% + tidyr::separate( + col = .data$name, + into = c("base_name", "version", "ext"), + # Version is separated with the "__" string + sep = "__", + remove = FALSE + ) %>% + dplyr::filter(stringr::str_detect(.data$ext, paste0(extension, "$"))) %>% + dplyr::group_by(.data$base_name, .data$ext) + + if (isTRUE(exact_match)) { + selected_rows <- gcs_files_formatted %>% + dplyr::filter(.data$base_name == prefix) + } else { + selected_rows <- gcs_files_formatted + } + + if (version == "latest") { + selected_rows <- selected_rows %>% + dplyr::filter(max(.data$updated) == .data$updated) + } else { + this_version <- version + selected_rows <- selected_rows %>% + dplyr::filter(.data$version == this_version) + } + + selected_rows$name + } +} + + +#' Download Object from Cloud Storage +#' +#' Downloads an object from cloud storage to a local file. +#' +#' @param name The name of the object in the storage bucket. +#' @param provider A character string specifying the cloud provider ("gcs" or "aws"). +#' @param options A named list of provider-specific options including the bucket and authentication details. +#' @param file (Optional) The local path to save the downloaded object. If not specified, the object name is used. +#' +#' @details For GCS, the options list should include: +#' - `bucket`: The name of the bucket from which the object is downloaded. +#' - `service_account_key`: The authentication JSON contents, if not previously authenticated. +#' +#' @return The path to the downloaded file. +#' @export +#' @keywords storage +#' @examples +#' \dontrun{ +#' authentication_details <- readLines("path/to/json_file.json") +#' download_cloud_file("object_name.json", +#' "gcs", +#' list(service_account_key = authentication_details, bucket = "my-bucket"), +#' "local_path/to/save/object.json") +#' } +#' +download_cloud_file <- function(name, provider, options, file = name) { + cloud_storage_authenticate(provider, options) + + if ("gcs" %in% provider) { + purrr::map2( + name, file, + ~ googleCloudStorageR::gcs_get_object( + object_name = .x, + bucket = options$bucket, + saveToDisk = .y, + overwrite = ifelse(is.null(options$overwrite), TRUE, options$overwrite) + ) + ) + } + + file +} + +#' Download WCS Preprocessed Surveys +#' +#' Retrieves preprocessed survey data from Google Cloud Storage, specifically configured for WCS (Wildlife Conservation Society) datasets. This function fetches data stored in Parquet format. +#' +#' @param pars A list representing the configuration settings, typically obtained from a YAML configuration file. +#' +#' @return A dataframe of preprocessed survey landings, loaded from Parquet files. +#' @keywords storage +#' @export +#' @examples +#' \dontrun{ +#' config <- peskas.zanzibar.pipeline::read_config() +#' df_preprocessed <- get_preprocessed_surveys(config) +#' } +#' +get_preprocessed_surveys <- function(pars) { + wcs_preprocessed_surveys <- + cloud_object_name( + prefix = pars$surveys$wcs_surveys$preprocessed_surveys$file_prefix, + provider = pars$storage$google$key, + extension = "parquet", + version = pars$surveys$wcs_surveys$version$preprocess, + options = pars$storage$google$options + ) + + logger::log_info("Retrieving {wcs_preprocessed_surveys}") + download_cloud_file( + name = wcs_preprocessed_surveys, + provider = pars$storage$google$key, + options = pars$storage$google$options + ) + + arrow::read_parquet(wcs_preprocessed_surveys) +} + + +#' Download WCS Validated Surveys +#' +#' Retrieves validated survey data from Google Cloud Storage, tailored for WCS (Wildlife Conservation Society) datasets. This function fetches data stored in Parquet format. +#' +#' @param pars A list representing the configuration settings, typically obtained from a YAML configuration file. +#' +#' @return A dataframe of validated survey landings, loaded from Parquet files. +#' @keywords storage +#' @export +#' @examples +#' \dontrun{ +#' config <- peskas.zanzibar.pipeline::read_config() +#' df_validated <- get_validated_surveys(config) +#' #' +#' } +#' +get_validated_surveys <- function(pars) { + wcs_validated_surveys <- + cloud_object_name( + prefix = pars$surveys$wcs_surveys$validated_surveys$file_prefix, + provider = pars$storage$google$key, + extension = "parquet", + version = pars$surveys$wcs_surveys$version$preprocess, + options = pars$storage$google$options + ) + + logger::log_info("Retrieving {wcs_validated_surveys}") + download_cloud_file( + name = wcs_validated_surveys, + provider = pars$storage$google$key, + options = pars$storage$google$options + ) + + arrow::read_parquet(wcs_validated_surveys) +} diff --git a/R/utils.R b/R/utils.R index f836e39..734f4a3 100644 --- a/R/utils.R +++ b/R/utils.R @@ -21,7 +21,7 @@ #' running inside a container) then this function attempts to get the sha from #' the environment variable `GITHUB_SHA`. If both of these methods fail, no sha #' versioning is added. -#' +#' @keywords helper #' @examples #' if (git2r::in_repository()) { #' add_version("my_file", "csv") @@ -57,6 +57,7 @@ add_version <- function(filename, extension = "", sha_nchar = 7, sep = "__") { #' for convenience #' #' @return the environment parameters +#' @keywords helper #' @export #' read_config <- function() { diff --git a/R/validate-surveys.R b/R/validate-surveys.R deleted file mode 100644 index 7833c09..0000000 --- a/R/validate-surveys.R +++ /dev/null @@ -1,115 +0,0 @@ -#' Validate WCS Surveys Data -#' -#' This function validates Wildlife Conservation Society (WCS) survey data. It reads -#' configuration parameters, preprocesses surveys, -#' and performs various validations on survey duration, catches, lengths, and market data. -#' It also logs the process at specified -#' log thresholds. The function consolidates validated data and saves it as an RDS file, -#' which is then uploaded to cloud storage. -#' -#' @param log_threshold A log level threshold from the `logger` package, used to -#' set the minimum level of log messages to be captured. -#' @importFrom logger log_threshold log_info -#' @importFrom dplyr select left_join mutate -#' @importFrom purrr map reduce -#' @importFrom lubridate with_tz -#' @importFrom readr write_rds -#' -#' @return No return value; this function is called for its side effects, -#' including data validation, file creation, and cloud uploading. -#' @export -#' -#' @examples -#' # Assuming necessary configuration and data are available: -#' \dontrun{ -#' validate_wcs_surveys(log_threshold = logger::INFO) -#' } -#' @seealso \code{\link{validate_catch}}, \code{\link{validate_length}}, \code{\link{validate_market}} -validate_wcs_surveys <- function(log_threshold = logger::DEBUG) { - logger::log_threshold(log_threshold) - - pars <- read_config() - preprocessed_surveys <- get_preprocessed_surveys(pars) - - # define validation parameters - k_max_nb <- pars$surveys$wcs_surveys$validation$K_nb_elements_max - k_max_weight <- pars$surveys$wcs_surveys$validation$K_weight_max - k_max_length <- pars$surveys$wcs_surveys$validation$K_length_max - k_max_price <- pars$surveys$wcs_surveys$validation$K_price_max - - logger::log_info("Validating catches groups") - surveys_catch_alerts <- validate_catch(data = preprocessed_surveys, k_max_nb = k_max_nb, k_max_weight = k_max_weight) - logger::log_info("Validating lengths group") - surveys_length_alerts <- validate_length(data = preprocessed_surveys, k_max_length = k_max_length) - logger::log_info("Validating markets group") - surveys_market_alerts <- validate_market(data = preprocessed_surveys, k_max_price = k_max_price) - - logger::log_info("Renaming data fields") - validated_groups <- - list( - surveys_catch_alerts, - surveys_length_alerts, - surveys_market_alerts - ) %>% - purrr::map(~ dplyr::select(.x, -alert_number)) %>% - purrr::reduce(dplyr::left_join, by = "submission_id") - - trips_info <- - preprocessed_surveys %>% - dplyr::mutate( - submission_id = as.integer(.data$`_id`), - date = lubridate::with_tz(.data$today, "Africa/Dar_es_Salaam"), - date = as.Date(date), - landing_site = stringr::str_to_title(.data$landing_site) - ) %>% - # convert fields - dplyr::mutate( - dplyr::across(.cols = c( - .data$lat, - .data$lon, - .data$engine, - .data$people, - .data$boats_landed - ), ~ as.numeric(.x)) - ) %>% - dplyr::select( - .data$submission_id, - .data$date, - .data$survey_real, - .data$survey_type, - .data$landing_site, - .data$lat, - .data$lon, - trip_duration_days = .data$fishing_duration, - .data$fishing_location, - .data$fishing_ground_name, - .data$fishing_ground_type, - .data$fishing_ground_depth, - .data$gear_type, - .data$boat_type, - .data$engine_yn, - .data$engine, - .data$people, - .data$boats_landed - ) - - validated_surveys <- - dplyr::left_join(trips_info, validated_groups, by = "submission_id") - - validated_filename <- - pars$surveys$wcs_surveys$validated_surveys$file_prefix %>% - add_version(extension = "rds") - - readr::write_rds( - x = validated_surveys, - file = validated_filename, - compress = "gz" - ) - - logger::log_info("Uploading {validated_filename} to cloud sorage") - upload_cloud_file( - file = validated_filename, - provider = pars$storage$google$key, - options = pars$storage$google$options - ) -} diff --git a/R/validation-functions.R b/R/validation.R similarity index 68% rename from R/validation-functions.R rename to R/validation.R index 9ee6bb4..3da2714 100644 --- a/R/validation-functions.R +++ b/R/validation.R @@ -1,3 +1,110 @@ +#' Validate WCS Surveys Data +#' +#' Validates Wildlife Conservation Society (WCS) survey data by checking for inconsistencies in survey duration, catches, lengths, and market data. The function preprocesses surveys, performs validations, logs the process, and uploads the validated data to cloud storage. +#' +#' @param log_threshold The logging level used as a threshold for the `logger` package, which controls the verbosity of logging output. +#' @return None; the function is used for its side effects, which include data validation and uploading validated data to cloud storage. +#' @keywords workflow validation +#' @export +#' @examples +#' \dontrun{ +#' validate_wcs_surveys(log_threshold = logger::INFO) +#' } +#' @seealso \code{\link{validate_catch}}, \code{\link{validate_length}}, \code{\link{validate_market}} +#' +#' +validate_wcs_surveys <- function(log_threshold = logger::DEBUG) { + logger::log_threshold(log_threshold) + + pars <- read_config() + preprocessed_surveys <- get_preprocessed_surveys(pars) + + # define validation parameters + k_max_nb <- pars$surveys$wcs_surveys$validation$K_nb_elements_max + k_max_weight <- pars$surveys$wcs_surveys$validation$K_weight_max + k_max_length <- pars$surveys$wcs_surveys$validation$K_length_max + k_max_price <- pars$surveys$wcs_surveys$validation$K_price_max + + logger::log_info("Validating catches groups") + surveys_catch_alerts <- validate_catch(data = preprocessed_surveys, k_max_nb = k_max_nb, k_max_weight = k_max_weight) + logger::log_info("Validating lengths group") + surveys_length_alerts <- validate_length(data = preprocessed_surveys, k_max_length = k_max_length) + logger::log_info("Validating markets group") + surveys_market_alerts <- validate_market(data = preprocessed_surveys, k_max_price = k_max_price) + + logger::log_info("Renaming data fields") + validated_groups <- + list( + surveys_catch_alerts, + surveys_length_alerts, + surveys_market_alerts + ) %>% + purrr::map(~ dplyr::select(.x, -alert_number)) %>% + purrr::reduce(dplyr::left_join, by = "submission_id") + + trips_info <- + preprocessed_surveys %>% + dplyr::mutate( + submission_id = as.integer(.data$`_id`), + date = lubridate::with_tz(.data$today, "Africa/Dar_es_Salaam"), + date = as.Date(date), + landing_site = stringr::str_to_title(.data$landing_site) + ) %>% + # convert fields + dplyr::mutate( + dplyr::across(.cols = c( + .data$lat, + .data$lon, + .data$engine, + .data$people, + .data$boats_landed + ), ~ as.numeric(.x)) + ) %>% + dplyr::select( + .data$submission_id, + .data$date, + .data$survey_real, + .data$survey_type, + .data$landing_site, + .data$lat, + .data$lon, + trip_duration_days = .data$fishing_duration, + .data$fishing_location, + .data$fishing_ground_name, + .data$fishing_ground_type, + .data$fishing_ground_depth, + .data$gear_type, + .data$boat_type, + .data$engine_yn, + .data$engine, + .data$people, + .data$boats_landed + ) + + validated_surveys <- + dplyr::left_join(trips_info, validated_groups, by = "submission_id") + + validated_filename <- + pars$surveys$wcs_surveys$validated_surveys$file_prefix %>% + add_version(extension = "parquet") + + arrow::write_parquet( + x = validated_surveys, + sink = validated_filename, + compression = "lz4", + compression_level = 12 + ) + + + logger::log_info("Uploading {validated_filename} to cloud sorage") + upload_cloud_file( + file = validated_filename, + provider = pars$storage$google$key, + options = pars$storage$google$options + ) +} + + #' Generate an alert vector based on the `univOutl::LocScaleB()` function #' #' @param x numeric vector where outliers will be checked @@ -8,6 +115,10 @@ #' #' @return a vector of the same lenght as x #' @importFrom stats mad +#' +#' @keywords validation +#' @export +#' alert_outlier <- function(x, no_alert_value = NA_real_, alert_if_larger = no_alert_value, @@ -48,27 +159,17 @@ alert_outlier <- function(x, } -#' Validate surveys' fishing duration +#' Validate Fishing Duration in WCS Surveys #' -#' This function takes a preprocessed landings' matrix and validate fishing trip -#' duration associated to each survey. +#' Checks fishing durations reported in WCS surveys against specified maximum and minimum hour thresholds, identifying and flagging any durations outside these bounds. #' -#' @param data A preprocessed data frame -#' @param hrs_max Upper threshold of fishing trip duration. -#' @param hrs_min Lower threshold of fishing trip duration. -#' -#' @return A list containing data frames with validated catch -#' duration. -#' -#' @importFrom rlang .data +#' @param data Data frame containing preprocessed survey data. +#' @param hrs_max Maximum allowable duration in hours. +#' @param hrs_min Minimum allowable duration in hours. +#' @return Data frame with validation results, including flags for surveys that do not meet duration criteria. +#' @keywords validation #' @export #' -#' @examples -#' \dontrun{ -#' pars <- read_config() -#' landings <- get_preprocessed_surveys(pars) -#' validate_surveys_time(landings, hrs_max = 72, hrs_min = 1) -#' } validate_surveys_time <- function(data = NULL, hrs_max = NULL, hrs_min = NULL) { data %>% dplyr::select(.data$`_id`, .data$fishing_duration) %>% @@ -111,15 +212,14 @@ validate_surveys_time <- function(data = NULL, hrs_max = NULL, hrs_min = NULL) { #' detected outliers, and nested catch data. #' @export #' +#' @keywords validation +#' @export #' @examples #' \dontrun{ #' # Assuming you have a data frame `catch_data` with the necessary structure: #' validated_catch <- validate_catch(data = catch_data, k_max_nb = 10, k_max_weight = 100) #' } #' -#' @importFrom dplyr select mutate group_by ungroup case_when coalesce -#' @importFrom tidyr unnest nest -#' @importFrom magrittr %>% validate_catch <- function(data = NULL, k_max_nb = NULL, k_max_weight = NULL) { data %>% dplyr::select("_id", "gear_type", "catch") %>% @@ -182,15 +282,14 @@ validate_catch <- function(data = NULL, k_max_nb = NULL, k_max_weight = NULL) { #' detected outliers, and nested length data. #' @export #' +#' @keywords validation +#' @export #' @examples #' \dontrun{ #' # Assuming you have a data frame `length_data` with the necessary structure: #' validated_length <- validate_length(data = length_data, k_max_length = 200) #' } #' -#' @importFrom dplyr select mutate group_by ungroup case_when -#' @importFrom tidyr unnest nest -#' @importFrom magrittr %>% validate_length <- function(data = NULL, k_max_length = NULL) { data %>% dplyr::select("_id", "gear_type", "length") %>% @@ -244,17 +343,14 @@ validate_length <- function(data = NULL, k_max_length = NULL) { #' #' @return A data frame with the original market data, additional columns for #' detected outliers, and nested market data. -#' @export #' +#' @keywords validation +#' @export #' @examples #' \dontrun{ #' # Assuming you have a data frame `market_data` with the necessary structure: #' validated_market <- validate_market(data = market_data, k_max_price = 100) #' } -#' -#' @importFrom dplyr select mutate group_by ungroup case_when -#' @importFrom tidyr unnest nest -#' @importFrom magrittr %>% validate_market <- function(data = NULL, k_max_price = NULL) { data %>% dplyr::select("_id", "gear_type", "market") %>% diff --git a/_pkgdown.yml b/_pkgdown.yml index 99c110f..66c3f50 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -12,3 +12,44 @@ template: family: IBM Plex Sans code_font: google: Fira Mono + + +reference: +- title: Cloud Storage + desc: Functions that interact with cloud storage providers. + contents: + - has_keyword("storage") + +- title: Workflow + desc: > + These are arguably the most important functions in the package. Each of + these functions executes a step in the data pipeline. +- contents: + - has_keyword("workflow") + +- title: Ingestion + desc: Functions dedicated to data pulling and ingestion from servers +- contents: + - has_keyword("ingestion") + +- title: Preprocessing + desc: Functions dedicated to the preprocessing module and general data processing + contents: + - has_keyword("preprocessing") + - expand_taxa + +- title: Validation + desc: Functions dedicated to the validation module + contents: + - has_keyword("validation") + +- title: Helper functions + desc: Functions dedicated to data processing. + contents: + - has_keyword("helper") + +- title: Export + desc: Functions dedicated dissemination of processed and validated data and visualizations + contents: + - has_keyword("export") + - kepler_mapper diff --git a/man/add_version.Rd b/man/add_version.Rd index bf528f4..df69e9f 100644 --- a/man/add_version.Rd +++ b/man/add_version.Rd @@ -37,3 +37,4 @@ if (git2r::in_repository()) { add_version("my_file", "csv") } } +\keyword{helper} diff --git a/man/alert_outlier.Rd b/man/alert_outlier.Rd index 5cd91a0..f3dc5f2 100644 --- a/man/alert_outlier.Rd +++ b/man/alert_outlier.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/validation-functions.R +% Please edit documentation in R/validation.R \name{alert_outlier} \alias{alert_outlier} \title{Generate an alert vector based on the \code{univOutl::LocScaleB()} function} @@ -29,3 +29,4 @@ a vector of the same lenght as x \description{ Generate an alert vector based on the \code{univOutl::LocScaleB()} function } +\keyword{validation} diff --git a/man/cloud_object_name.Rd b/man/cloud_object_name.Rd index cc187aa..e3f54b9 100644 --- a/man/cloud_object_name.Rd +++ b/man/cloud_object_name.Rd @@ -1,8 +1,8 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/cloud-storage.R +% Please edit documentation in R/storage.R \name{cloud_object_name} \alias{cloud_object_name} -\title{Get the full name of a versioned cloud object} +\title{Retrieve Full Name of Versioned Cloud Object} \usage{ cloud_object_name( prefix, @@ -14,71 +14,40 @@ cloud_object_name( ) } \arguments{ -\item{prefix}{string indicating the prefix of the object} +\item{prefix}{A string indicating the object's prefix.} -\item{version}{either "latest" or the specific version string generated by -\link{add_version} when the file was uploaded to the cloud provider} +\item{version}{A string specifying the version ("latest" or a specific version string).} -\item{extension}{extension of the desired file. Use an empty string "" to -return all extensions founds} +\item{extension}{The file extension to filter by. An empty string ("") includes all extensions.} -\item{provider}{cloud provider to use, either "gcs" or "aws"} +\item{provider}{A character string specifying the cloud provider ("gcs" or "aws").} -\item{exact_match}{logical indicating whether the prefix should be matched -exactly} +\item{exact_match}{A logical indicating whether to match the prefix exactly.} -\item{options}{named list with cloud provider options, see details} +\item{options}{A named list of provider-specific options including the bucket and authentication details.} } \value{ -A string vector with the object names in the cloud storage that match -the prefix, the version, and the extension indicated in the parameters +A vector of names of objects matching the criteria. } \description{ -Obtain the full name (e.g. -\code{timor-landings-v2_metadata__20210326084600_54617b3__.json}) of a cloud -storage object. If there are more than one object matching the prefix, -version, and extension, a vector with all the names is returned. +Gets the full name(s) of object(s) in cloud storage matching the specified prefix, version, and file extension. } \details{ -\subsection{Google Cloud Services}{ - -For Google Cloud Services ("gcs") options must be a list with two fields: -\code{bucket} with the bucketname (character) you are uploading to, and -\code{service_account_key} with the contents of the authentication json file you -have downloaded from your Google Project (if \link{cloud_storage_authenticate} has -not been called before). - -This function uses \link[googleCloudStorageR:gcs_upload]{googleCloudStorageR::gcs_upload} under the hood to upload -the file. +For GCS, the options list should include: +\itemize{ +\item \code{bucket}: The bucket name. +\item \code{service_account_key}: The authentication JSON contents, if not previously authenticated. } } \examples{ - -#' # Google Cloud Services \dontrun{ -authentication_details <- readLines("location_of_json_file.json") -# obtain the latest version of all files corresponding to timor-landings-v2 -cloud_object_name( - prefix = "timor-landings-v2", - version = "latest", - provider = "gcs", - options = list( - service_account_key = authentication_details, - bucket = "my-bucket" - ) -) - -# obtain a specific version of the structured data from timor-landings-v2 -cloud_object_name( - prefix = "timor-landings-v2_raw", - version = "20210326084600_54617b", - extension = "csv", - provider = "gcs", - options = list( - service_account_key = authentication_details, - bucket = "my-bucket" - ) -) +authentication_details <- readLines("path/to/json_file.json") +cloud_object_name("prefix", + "latest", + "json", + "gcs", + list(service_account_key = authentication_details, bucket = "my-bucket")) +#' } - } +\keyword{storage} diff --git a/man/cloud_storage_authenticate.Rd b/man/cloud_storage_authenticate.Rd index 02d6aab..4286b54 100644 --- a/man/cloud_storage_authenticate.Rd +++ b/man/cloud_storage_authenticate.Rd @@ -1,41 +1,33 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/cloud-storage.R +% Please edit documentation in R/storage.R \name{cloud_storage_authenticate} \alias{cloud_storage_authenticate} -\title{Authenticate to a storage cloud provider} +\title{Authenticate to a Cloud Storage Provider} \usage{ cloud_storage_authenticate(provider, options) } \arguments{ -\item{provider}{cloud provider to use, either "gcs" or "aws"} +\item{provider}{A character string specifying the cloud provider ("gcs" or "aws").} -\item{options}{named list with cloud provider options, see details} +\item{options}{A named list of options specific to the cloud provider (see details).} } \description{ -Usually used internally by other functions +This function is primarily used internally by other functions to establish authentication +with specified cloud providers such as Google Cloud Services (GCS) or Amazon Web Services (AWS). } \details{ -\subsection{Google Cloud Services}{ - -For Google Cloud Services ("gcs") options must be a list with the field -\code{service_account_key} with the contents of the authentication json file you -have downloaded from your Google Project. - -This function uses \link[googleCloudStorageR:gcs_auth]{googleCloudStorageR::gcs_auth} under the hood to -authenticate. +For GCS, the options list must include: +\itemize{ +\item \code{service_account_key}: The contents of the authentication JSON file from your Google Project. } + +This function wraps \code{\link[googleCloudStorageR:gcs_auth]{googleCloudStorageR::gcs_auth()}} to handle GCS authentication. } \examples{ - -# Google Cloud Services \dontrun{ -authentication_details <- readLines("location_of_json_file.json") -cloud_storage_authenticate( - provider = "gcs", - options = list( - service_account_key = authentication_details, - bucket = "my-bucket" - ) -) +authentication_details <- readLines("path/to/json_file.json") +cloud_storage_authenticate("gcs", list(service_account_key = authentication_details)) +#' } } +\keyword{storage} diff --git a/man/download_cloud_file.Rd b/man/download_cloud_file.Rd index d22c26f..9ac365f 100644 --- a/man/download_cloud_file.Rd +++ b/man/download_cloud_file.Rd @@ -1,39 +1,41 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/cloud-storage.R +% Please edit documentation in R/storage.R \name{download_cloud_file} \alias{download_cloud_file} -\title{Download an object from a cloud storage bucket to a local file} +\title{Download Object from Cloud Storage} \usage{ download_cloud_file(name, provider, options, file = name) } \arguments{ -\item{name}{the name of the object in the storage bucket.} +\item{name}{The name of the object in the storage bucket.} -\item{provider}{cloud provider to use, either "gcs" or "aws"} +\item{provider}{A character string specifying the cloud provider ("gcs" or "aws").} -\item{options}{named list with cloud provider options, see details} +\item{options}{A named list of provider-specific options including the bucket and authentication details.} -\item{file}{a file-path (character) where the object will be saved. Default -is the object name.} +\item{file}{(Optional) The local path to save the downloaded object. If not specified, the object name is used.} } \value{ -the file path +The path to the downloaded file. } \description{ -Download object from the cloud storage to a local file +Downloads an object from cloud storage to a local file. +} +\details{ +For GCS, the options list should include: +\itemize{ +\item \code{bucket}: The name of the bucket from which the object is downloaded. +\item \code{service_account_key}: The authentication JSON contents, if not previously authenticated. +} } \examples{ - -# Google Cloud Services \dontrun{ -authentication_details <- readLines("location_of_json_file.json") -download_cloud_file( - name = "timor-landings-v2_metadata__20210326084600_54617b3__.json", - provider = "gcs", - options = list( - service_account_key = authentication_details, - bucket = "my-bucket" - ) -) +authentication_details <- readLines("path/to/json_file.json") +download_cloud_file("object_name.json", + "gcs", + list(service_account_key = authentication_details, bucket = "my-bucket"), + "local_path/to/save/object.json") } + } +\keyword{storage} diff --git a/man/expand_taxa.Rd b/man/expand_taxa.Rd index decffab..ee9b669 100644 --- a/man/expand_taxa.Rd +++ b/man/expand_taxa.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/processing.R +% Please edit documentation in R/preprocessing.R \name{expand_taxa} \alias{expand_taxa} \title{Expand Taxonomic Vectors into a Data Frame} @@ -7,32 +7,19 @@ expand_taxa(data = NULL) } \arguments{ -\item{data}{A vector of species identifiers where each entry is a string with -the format 'family_genus_species'. If not supplied, the function will throw an error.} +\item{data}{A vector of species identifiers formatted as 'family_genus_species'. If not provided, the function will return an error.} } \value{ -A data frame where each row corresponds to a unique species from the input data. -The data frame includes columns for species catch, number of elements, weight, and taxonomic -classification including catch group, kingdom, phylum, order, family, genus and species. +A data frame where each row corresponds to a species, enriched with taxonomic classification information including family, genus, species, and additional taxonomic ranks. } \description{ -This function takes a vector of species identifiers and expands it into a -data frame that includes taxonomic classification information for each species. -Each species identifier is expected to follow the format 'family_genus_species'. +Converts a vector of species identifiers into a detailed data frame containing taxonomic classification. Each identifier should follow the format 'family_genus_species', which is expanded to include comprehensive taxonomic details. } \details{ -The function first splits the species identifier into a list of species, -replaces underscores with spaces, and counts the number of words to help with further -classification. It then uses conditional logic to extract genus and species names, -handle special cases, and clean up group names. The cleaned data is then used to fetch -taxonomic classification from the GBIF database using the \code{taxize} package. The final -data frame is constructed by joining the expanded species data with the taxonomic ranks. +This function splits each species identifier into its constituent parts, replaces underscores with spaces for readability, and retrieves taxonomic classification from the GBIF database using the \code{taxize} package. } \note{ -The function requires internet access to fetch data from the GBIF database. -It also assumes that the input vector is properly formatted. -If there are any formatting issues or the GBIF database does not recognize a species, -the function may return unexpected results or throw an error. +Requires internet access to fetch data from the GBIF database. The accuracy of results depends on the correct formatting of input data and the availability of taxonomic data in the GBIF database. } \examples{ \dontrun{ @@ -40,3 +27,5 @@ species_vector <- c("lutjanidae_lutjanus_spp", "scaridae_spp", "acanthuridae_nas expanded_data <- expand_taxa(species_vector) } } +\keyword{data} +\keyword{transformation} diff --git a/man/flatten_field.Rd b/man/flatten_field.Rd new file mode 100644 index 0000000..42aa9b3 --- /dev/null +++ b/man/flatten_field.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/ingestion.R +\name{flatten_field} +\alias{flatten_field} +\title{Flatten Survey Data Fields} +\usage{ +flatten_field(x, p) +} +\arguments{ +\item{x}{A vector or list representing a field in the data.} + +\item{p}{The prefix or name associated with the field, used for naming during the flattening process.} +} +\value{ +Modified field, either unchanged, unnested, or appropriately renamed. +} +\description{ +Processes each field within a row of survey data, handling both simple vectors and nested lists. For lists with named elements, renames and unlists them for flat structure preparation. +} +\keyword{internal} diff --git a/man/flatten_row.Rd b/man/flatten_row.Rd new file mode 100644 index 0000000..81edf04 --- /dev/null +++ b/man/flatten_row.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/ingestion.R +\name{flatten_row} +\alias{flatten_row} +\title{Flatten Survey Data Rows} +\usage{ +flatten_row(x) +} +\arguments{ +\item{x}{A list representing a row of data, potentially containing nested lists or vectors.} +} +\value{ +A tibble with each row representing flattened survey data. +} +\description{ +Transforms each row of nested survey data into a flat tabular format using a mapping and flattening process. +} +\keyword{internal} diff --git a/man/get_preprocessed_surveys.Rd b/man/get_preprocessed_surveys.Rd index 5003369..1948bac 100644 --- a/man/get_preprocessed_surveys.Rd +++ b/man/get_preprocessed_surveys.Rd @@ -1,17 +1,25 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/get-cloud-files.R +% Please edit documentation in R/storage.R \name{get_preprocessed_surveys} \alias{get_preprocessed_surveys} -\title{Download WCS preprocessed surveys} +\title{Download WCS Preprocessed Surveys} \usage{ get_preprocessed_surveys(pars) } \arguments{ -\item{pars}{The configuration file.} +\item{pars}{A list representing the configuration settings, typically obtained from a YAML configuration file.} } \value{ -A rds dataframe of preprocessed survey landings. +A dataframe of preprocessed survey landings, loaded from Parquet files. } \description{ -Download preprocessed WCS data from Google Cloud. +Retrieves preprocessed survey data from Google Cloud Storage, specifically configured for WCS (Wildlife Conservation Society) datasets. This function fetches data stored in Parquet format. } +\examples{ +\dontrun{ +config <- peskas.zanzibar.pipeline::read_config() +df_preprocessed <- get_preprocessed_surveys(config) +} + +} +\keyword{storage} diff --git a/man/get_validated_surveys.Rd b/man/get_validated_surveys.Rd index 8e919ae..2801e53 100644 --- a/man/get_validated_surveys.Rd +++ b/man/get_validated_surveys.Rd @@ -1,17 +1,26 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/get-cloud-files.R +% Please edit documentation in R/storage.R \name{get_validated_surveys} \alias{get_validated_surveys} -\title{Download WCS validated surveys} +\title{Download WCS Validated Surveys} \usage{ get_validated_surveys(pars) } \arguments{ -\item{pars}{The configuration file.} +\item{pars}{A list representing the configuration settings, typically obtained from a YAML configuration file.} } \value{ -A rds dataframe of validated survey landings. +A dataframe of validated survey landings, loaded from Parquet files. } \description{ -Download validated WCS data from Google Cloud. +Retrieves validated survey data from Google Cloud Storage, tailored for WCS (Wildlife Conservation Society) datasets. This function fetches data stored in Parquet format. } +\examples{ +\dontrun{ +config <- peskas.zanzibar.pipeline::read_config() +df_validated <- get_validated_surveys(config) +#' +} + +} +\keyword{storage} diff --git a/man/ingest_wcs_surveys.Rd b/man/ingest_wcs_surveys.Rd index 4d372dd..af77948 100644 --- a/man/ingest_wcs_surveys.Rd +++ b/man/ingest_wcs_surveys.Rd @@ -1,30 +1,22 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/ingest-wcs-surveys.R +% Please edit documentation in R/ingestion.R \name{ingest_wcs_surveys} \alias{ingest_wcs_surveys} -\title{Ingest WCS catch sruvey data} +\title{Ingest WCS Catch Survey Data} \usage{ ingest_wcs_surveys(log_threshold = logger::DEBUG) } \arguments{ -\item{log_threshold}{The (standard Apache logj4) log level used as a -threshold for the logging infrastructure. See \link[logger:log_levels]{logger::log_levels} for more -details} +\item{log_threshold}{Log level used as the threshold for logging (see \link[logger:log_levels]{logger::log_levels}).} } \value{ -No output. This function is used for it's side effects +None; the function is used for its side effects. } \description{ -Downloads landings information that has been collected using Kobo Toolbox and -uploads it to cloud storage services. +This function automates the downloading of WCS fish catch survey data collected through Kobo Toolbox and uploads it to cloud storage services. The filenames are versioned to include date-time stamps and, if available, the first 7 digits of the Git commit SHA. } \details{ -This function downloads the survey data and uploads this information to cloud -services. File names used contain a -versioning string that includes the date-time and, if available, the first 7 -digits of the git commit sha. This is acomplished using \code{\link[=add_version]{add_version()}} - -The parameters needed in \code{conf.yml} are: +Configuration parameters required from \code{conf.yml} include: \if{html}{\out{
}}\preformatted{surveys: wcs_surveys: @@ -42,5 +34,14 @@ storage: }\if{html}{\out{
}} Progress through the function is tracked using the package \emph{logger}. + +Logging progress is managed using the \code{logger} package. +} +\examples{ +\dontrun{ +ingest_wcs_surveys(logger::DEBUG) +} + } +\keyword{ingestion} \keyword{workflow} diff --git a/man/kepler_mapper.Rd b/man/kepler_mapper.Rd index 1515f4d..fbafc6e 100644 --- a/man/kepler_mapper.Rd +++ b/man/kepler_mapper.Rd @@ -17,3 +17,4 @@ This function is a R wrapper of \code{kepler_wcs_mapper_py}, a python script fun aimed to elaborate produce a self-contained map (in html) using the Kepler.gl python library \url{https://docs.kepler.gl/docs/keplergl-jupyter}. } +\keyword{visualization} diff --git a/man/preprocess_wcs_surveys.Rd b/man/preprocess_wcs_surveys.Rd index 454f269..6ea9d97 100644 --- a/man/preprocess_wcs_surveys.Rd +++ b/man/preprocess_wcs_surveys.Rd @@ -1,34 +1,22 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/preprocess-wcs-surveys.R +% Please edit documentation in R/preprocessing.R \name{preprocess_wcs_surveys} \alias{preprocess_wcs_surveys} -\title{Pre-process Zanzibar WCS surveys} +\title{Pre-process Zanzibar WCS Surveys} \usage{ preprocess_wcs_surveys(log_threshold = logger::DEBUG) } \arguments{ -\item{log_threshold}{The (standard Apache logj4) log level used as a -threshold for the logging infrastructure. See \link[logger:log_levels]{logger::log_levels} for more -details} +\item{log_threshold}{Log level used as the threshold for logging (see \link[logger:log_levels]{logger::log_levels}).} } \value{ -no outputs. This function is used for it's side effects +None; the function is used for its side effects. } \description{ -Downloads raw structured data from cloud storage services and pre-processes -into a binary format that is easier to deal with in R. During the pre-processing -phase, multiple columns in the survey data, which can become very wide due to -multiple recordings of similar information (e.g., species information), are nested -using a set of utility functions (\code{pt_nest_trip}, \code{pt_nest_catch}, -\code{pt_nest_length}, \code{pt_nest_market}, \code{pt_nest_attachments}). +Downloads and preprocesses raw structured WCS survey data from cloud storage into a binary format. The process includes nesting multiple columns related to species information into single columns within a dataframe, which helps reduce its width and organize data efficiently for analysis. } \details{ -Nesting these columns helps in reducing the width of the dataframe and organizes -related columns into a single nested tibble column, thus simplifying subsequent -analysis and visualization tasks.#' - -This function downloads the landings data from a given version (specified in -the config file \code{conf.yml}.The parameters needed are: +Configurations are read from \code{conf.yml} with the following necessary parameters: \if{html}{\out{
}}\preformatted{surveys: wcs_surveys: @@ -47,11 +35,12 @@ storage: service_account_key: }\if{html}{\out{
}} -Progress through the function is tracked using the package \emph{logger}. +The function uses logging to track progress. } \seealso{ \code{\link[=pt_nest_trip]{pt_nest_trip}}, \code{\link[=pt_nest_catch]{pt_nest_catch}}, \code{\link[=pt_nest_length]{pt_nest_length}}, \code{\link[=pt_nest_market]{pt_nest_market}}, \code{\link[=pt_nest_attachments]{pt_nest_attachments}} } +\keyword{preprocessing} \keyword{workflow} diff --git a/man/pt_nest_attachments.Rd b/man/pt_nest_attachments.Rd index a090618..da01316 100644 --- a/man/pt_nest_attachments.Rd +++ b/man/pt_nest_attachments.Rd @@ -1,39 +1,22 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/pt_nest_survey.R +% Please edit documentation in R/preprocessing.R \name{pt_nest_attachments} \alias{pt_nest_attachments} -\title{Nest attachment columns} +\title{Nest Attachment Columns} \usage{ pt_nest_attachments(x) } \arguments{ -\item{x}{A data frame containing raw landings data from the Timor operations.} +\item{x}{A data frame containing raw survey data, potentially with multiple attachments per survey entry.} } \value{ -Landings data in which the information about multiple attachments has -been nested into a single column (\verb{_attachments}) this column contains a -tibble for every row. This, attachment tibble has as many rows as there are -attachments. +A data frame with attachment information nested into a single '_attachments' column, containing a tibble for each row. } \description{ -Nests attachment columns obtained when reading structured data from the kobo -landings survey -} -\details{ -One of the disadvantages of using structured survey data is that the tables -can become very wide (many columns). This happens when question groups or -other fields can be recorded multiple times. For example in the landings -survey, for each species captured, about 17 questions are recorded. There is -no limit to the number of species that can be recorded in the trip. If, for -example a survey records seven species we will have over a hundred columns in -the data corresponding to species information. - -To improve that situation an avoid using multiple tables we use \strong{nested data -frames} (see \link[tidyr:nest]{tidyr::nest}). In nested data frames columns can be lists and -can contain arbitrary information, like other data frames, lists, vectors, or -models. +Nests attachment-related columns from structured WCS survey data, organizing multiple attachment entries into a single nested column. This function addresses the challenge of handling wide data tables by converting them into more manageable nested data frames. } \examples{ +\dontrun{ dummy_landings <- tidyr::tibble( `_id` = "123", `_attachments.0.download_url` = "http://url-1.com", @@ -45,3 +28,6 @@ dummy_landings <- tidyr::tibble( ) pt_nest_attachments(dummy_landings) } + +} +\keyword{internal} diff --git a/man/pt_nest_catch.Rd b/man/pt_nest_catch.Rd index e886cbd..99f9cc7 100644 --- a/man/pt_nest_catch.Rd +++ b/man/pt_nest_catch.Rd @@ -1,21 +1,20 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/pt_nest_survey.R +% Please edit documentation in R/preprocessing.R \name{pt_nest_catch} \alias{pt_nest_catch} -\title{Nest catch catch columns} +\title{Nest Catch Group Columns} \usage{ pt_nest_catch(x) } \arguments{ -\item{x}{Kobo survey in tabular format.} +\item{x}{Data frame of WCS survey data in tabular format.} } \value{ -Nested Landings data in which the information about multiple catch information has -been nested into a single column (\code{catch}) this column contains a -tibble for every row. This, attachment tibble has as many rows as there are -catch information. +A data frame with catch data nested into a 'catch' column, containing a tibble +for each row with various catch-related attributes. } \description{ -Nests catch group columns obtained when reading structured data from the kobo -landings survey. +Nests catch group columns from WCS structured survey data to organize multiple +related catch data points into a single nested 'catch' column per row. } +\keyword{internal} diff --git a/man/pt_nest_length.Rd b/man/pt_nest_length.Rd index 5c509c8..db38f10 100644 --- a/man/pt_nest_length.Rd +++ b/man/pt_nest_length.Rd @@ -1,21 +1,20 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/pt_nest_survey.R +% Please edit documentation in R/preprocessing.R \name{pt_nest_length} \alias{pt_nest_length} -\title{Nest length group columns} +\title{Nest Length Group Columns} \usage{ pt_nest_length(x) } \arguments{ -\item{x}{Kobo survey in tabular format.} +\item{x}{Data frame of WCS survey data in tabular format.} } \value{ -Nested Landings data in which the information about multiple length information has -been nested into a single column (\code{length}) this column contains a -tibble for every row. This, attachment tibble has as many rows as there are -length information +A data frame with length data nested into a single 'length' column, +which contains a tibble for each row with multiple measurements. } \description{ -Nests length group columns obtained when reading structured data from the kobo -landings survey. +Nests length group columns obtained from the structured data of WCS landings surveys. +This reduces the width of data by converting multiple related columns into a single nested column. } +\keyword{internal} diff --git a/man/pt_nest_market.Rd b/man/pt_nest_market.Rd index 953fd65..49f3319 100644 --- a/man/pt_nest_market.Rd +++ b/man/pt_nest_market.Rd @@ -1,21 +1,20 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/pt_nest_survey.R +% Please edit documentation in R/preprocessing.R \name{pt_nest_market} \alias{pt_nest_market} -\title{Nest market group columns} +\title{Nest Market Group Columns} \usage{ pt_nest_market(x) } \arguments{ -\item{x}{Kobo survey in tabular format.} +\item{x}{Data frame of WCS survey data in tabular format.} } \value{ -Nested Landings data in which the information about multiple market information has -been nested into a single column (\code{market}) this column contains a -tibble for every row. This, attachment tibble has as many rows as there are -market information. +A data frame with market data nested into a 'market' column, containing a tibble +for each row with various market-related attributes. } \description{ -Nests market group columns obtained when reading structured data from the kobo -landings survey. +Nests market group columns from structured WCS landings survey data. This method organizes +multiple related market data points into a single nested 'market' column per row. } +\keyword{internal} diff --git a/man/pt_nest_trip.Rd b/man/pt_nest_trip.Rd index 690a011..84746eb 100644 --- a/man/pt_nest_trip.Rd +++ b/man/pt_nest_trip.Rd @@ -1,21 +1,18 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/pt_nest_survey.R +% Please edit documentation in R/preprocessing.R \name{pt_nest_trip} \alias{pt_nest_trip} -\title{Nest trip catch columns} +\title{Nest Trip Group Columns} \usage{ pt_nest_trip(x) } \arguments{ -\item{x}{Kobo survey in tabular format.} +\item{x}{A data frame containing structured survey data in tabular format.} } \value{ -Nested Landings data in which the information about multiple trip information has -been nested into a single column (\code{trip}) this column contains a -tibble for every row. This, attachment tibble has as many rows as there are -trip information. +A data frame with trip data nested into a single 'trip' column containing a tibble for each row, corresponding to the various trip details. } \description{ -Nests trip group columns obtained when reading structured data from the kobo -landings survey. +Processes and nests trip-related columns from structured WCS landings survey data into a single 'trip' column. This approach consolidates trip information into nested tibbles within the dataframe, simplifying the structure for analysis. } +\keyword{internal} diff --git a/man/read_config.Rd b/man/read_config.Rd index f138eac..9a218cc 100644 --- a/man/read_config.Rd +++ b/man/read_config.Rd @@ -13,3 +13,4 @@ the environment parameters Reads configuration file in \code{conf.yml} and adds some logging lines. Wrapped for convenience } +\keyword{helper} diff --git a/man/rename_child.Rd b/man/rename_child.Rd new file mode 100644 index 0000000..4d28bf2 --- /dev/null +++ b/man/rename_child.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/ingestion.R +\name{rename_child} +\alias{rename_child} +\title{Rename Nested Survey Data Elements} +\usage{ +rename_child(x, i, p) +} +\arguments{ +\item{x}{A list element, possibly nested, to be renamed.} + +\item{i}{The index or key of the element within the parent list.} + +\item{p}{The parent name to prepend to the element's existing name for context.} +} +\value{ +A renamed list element, structured to maintain contextual relevance in a flattened dataset. +} +\description{ +Appends a parent name or index to child elements within a nested list, assisting in creating a coherent and traceable data structure during the flattening process. +} +\keyword{internal} diff --git a/man/retrieve_wcs_surveys.Rd b/man/retrieve_wcs_surveys.Rd index acb572e..6bdc132 100644 --- a/man/retrieve_wcs_surveys.Rd +++ b/man/retrieve_wcs_surveys.Rd @@ -1,12 +1,11 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/retrieve-wcs-data.R +% Please edit documentation in R/ingestion.R \name{retrieve_wcs_surveys} \alias{retrieve_wcs_surveys} -\title{Download WCS Surveys from Kobotoolbox} +\title{Retrieve WCS Surveys from Kobotoolbox} \usage{ retrieve_wcs_surveys( prefix = NULL, - file_format = NULL, append_version = NULL, url = NULL, project_id = NULL, @@ -16,40 +15,30 @@ retrieve_wcs_surveys( ) } \arguments{ -\item{prefix}{Name to be used as the prefix of the file names to be -downloaded. Can be a path.} +\item{prefix}{Filename prefix or path for downloaded files.} -\item{file_format}{Either "csv" or "rds",} +\item{append_version}{Boolean indicating whether to append versioning info to filenames.} -\item{append_version}{Whether to append versioning information to the -filename using \link{add_version}..} +\item{url}{URL of the Kobotoolbox instance.} -\item{url}{The URL of kobotoolbox (often referred to as 'kpi-url').} +\item{project_id}{Project asset ID for data download.} -\item{project_id}{Is the asset id of the asset for which the data is -to be downloaded.} +\item{username}{Kobotoolbox account username.} -\item{username}{Username of your kobotoolbox account.} +\item{psswd}{Kobotoolbox account password.} -\item{psswd}{Password of the account.} - -\item{encoding}{Encoding to be used. Default is "UTF-8".} +\item{encoding}{Character encoding for the downloaded data; defaults to "UTF-8".} } \value{ -A character vector with paths of the downloaded files. +Vector of paths for the downloaded Parquet files. } \description{ -This function retrieves survey data from Kobotoolbox for a specific project. -It allows users to customize the filename using a prefix, choose between CSV or RDS formats, -and decide whether to append versioning information to the filename. -The resulting files are downloaded to the working directory or specified path, -with paths returned as a character vector. +Downloads survey data from Kobotoolbox for a specified project and uploads the data in Parquet format. File naming can include versioning details. } \examples{ \dontrun{ file_list <- retrieve_wcs_surveys( prefix = "my_data", - file_format = "csv", append_version = TRUE, url = "kf.kobotoolbox.org", project_id = "my_project_id", @@ -58,4 +47,6 @@ file_list <- retrieve_wcs_surveys( encoding = "UTF-8" ) } + } +\keyword{ingestion} diff --git a/man/upload_cloud_file.Rd b/man/upload_cloud_file.Rd index 7a44efe..c486777 100644 --- a/man/upload_cloud_file.Rd +++ b/man/upload_cloud_file.Rd @@ -1,53 +1,42 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/cloud-storage.R +% Please edit documentation in R/storage.R \name{upload_cloud_file} \alias{upload_cloud_file} -\title{Upload a local file to a cloud storage bucket} +\title{Upload File to Cloud Storage} \usage{ upload_cloud_file(file, provider, options, name = file) } \arguments{ -\item{file}{a file-path (character) to upload. A vector with multiple files -is also supported.} +\item{file}{A character vector specifying the path(s) of the file(s) to upload.} -\item{provider}{cloud provider to use, either "gcs" or "aws"} +\item{provider}{A character string specifying the cloud provider ("gcs" or "aws").} -\item{options}{named list with cloud provider options, see details} +\item{options}{A named list of provider-specific options including the bucket and authentication details.} -\item{name}{What to call the file once uploaded. Default is the filepath} +\item{name}{(Optional) The name to assign to the file in the cloud. If not specified, the local file name is used.} } \value{ -If \code{provider} is "gcs" and if successful a list of medatada objects +A list of metadata objects for the uploaded files if successful. } \description{ -Upload a local file to a cloud storage bucket +Uploads a local file to a specified cloud storage bucket, supporting both single and multiple files. } \details{ -\subsection{Google Cloud Services}{ - -For Google Cloud Services ("gcs") options must be a list with two fields: -\code{bucket} with the bucketname (character) you are uploading to, and -\code{service_account_key} with the contents of the authentication json file you -have downloaded from your Google Project (if \link{cloud_storage_authenticate} has -not been called before). - -This function uses \link[googleCloudStorageR:gcs_upload]{googleCloudStorageR::gcs_upload} under the hood to upload -the file. +For GCS, the options list must include: +\itemize{ +\item \code{bucket}: The name of the bucket to which files are uploaded. +\item \code{service_account_key}: The authentication JSON contents, if not previously authenticated. } + +This function utilizes \code{\link[googleCloudStorageR:gcs_upload]{googleCloudStorageR::gcs_upload()}} for file uploads to GCS. } \examples{ - -# Google Cloud Services \dontrun{ -authentication_details <- readLines("location_of_json_file.json") -upload_cloud_file( - file = "table_to_upload.csv", - provider = "gcs", - options = list( - service_account_key = authentication_details, - bucket = "my-bucket" - ) -) +authentication_details <- readLines("path/to/json_file.json") +upload_cloud_file("path/to/local_file.csv", + "gcs", + list(service_account_key = authentication_details, bucket = "my-bucket")) } } +\keyword{storage} diff --git a/man/validate_catch.Rd b/man/validate_catch.Rd index 9183c4a..8afbea9 100644 --- a/man/validate_catch.Rd +++ b/man/validate_catch.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/validation-functions.R +% Please edit documentation in R/validation.R \name{validate_catch} \alias{validate_catch} \title{Validate Catch Data and Detect Outliers} @@ -38,3 +38,4 @@ validated_catch <- validate_catch(data = catch_data, k_max_nb = 10, k_max_weight } } +\keyword{validation} diff --git a/man/validate_length.Rd b/man/validate_length.Rd index e67cbe3..cc76e4e 100644 --- a/man/validate_length.Rd +++ b/man/validate_length.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/validation-functions.R +% Please edit documentation in R/validation.R \name{validate_length} \alias{validate_length} \title{Validate Length Data and Detect Outliers} @@ -32,3 +32,4 @@ validated_length <- validate_length(data = length_data, k_max_length = 200) } } +\keyword{validation} diff --git a/man/validate_market.Rd b/man/validate_market.Rd index a271b1f..0c8a53d 100644 --- a/man/validate_market.Rd +++ b/man/validate_market.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/validation-functions.R +% Please edit documentation in R/validation.R \name{validate_market} \alias{validate_market} \title{Validate Market Data and Detect Outliers in Price Per Kilogram} @@ -35,5 +35,5 @@ method to determine the extension of bounds for outlier detection. # Assuming you have a data frame `market_data` with the necessary structure: validated_market <- validate_market(data = market_data, k_max_price = 100) } - } +\keyword{validation} diff --git a/man/validate_surveys_time.Rd b/man/validate_surveys_time.Rd index 1d45147..e5aa1ad 100644 --- a/man/validate_surveys_time.Rd +++ b/man/validate_surveys_time.Rd @@ -1,30 +1,22 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/validation-functions.R +% Please edit documentation in R/validation.R \name{validate_surveys_time} \alias{validate_surveys_time} -\title{Validate surveys' fishing duration} +\title{Validate Fishing Duration in WCS Surveys} \usage{ validate_surveys_time(data = NULL, hrs_max = NULL, hrs_min = NULL) } \arguments{ -\item{data}{A preprocessed data frame} +\item{data}{Data frame containing preprocessed survey data.} -\item{hrs_max}{Upper threshold of fishing trip duration.} +\item{hrs_max}{Maximum allowable duration in hours.} -\item{hrs_min}{Lower threshold of fishing trip duration.} +\item{hrs_min}{Minimum allowable duration in hours.} } \value{ -A list containing data frames with validated catch -duration. +Data frame with validation results, including flags for surveys that do not meet duration criteria. } \description{ -This function takes a preprocessed landings' matrix and validate fishing trip -duration associated to each survey. -} -\examples{ -\dontrun{ -pars <- read_config() -landings <- get_preprocessed_surveys(pars) -validate_surveys_time(landings, hrs_max = 72, hrs_min = 1) -} +Checks fishing durations reported in WCS surveys against specified maximum and minimum hour thresholds, identifying and flagging any durations outside these bounds. } +\keyword{validation} diff --git a/man/validate_wcs_surveys.Rd b/man/validate_wcs_surveys.Rd index 0a0f71c..3d69c7b 100644 --- a/man/validate_wcs_surveys.Rd +++ b/man/validate_wcs_surveys.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/validate-surveys.R +% Please edit documentation in R/validation.R \name{validate_wcs_surveys} \alias{validate_wcs_surveys} \title{Validate WCS Surveys Data} @@ -7,23 +7,15 @@ validate_wcs_surveys(log_threshold = logger::DEBUG) } \arguments{ -\item{log_threshold}{A log level threshold from the \code{logger} package, used to -set the minimum level of log messages to be captured.} +\item{log_threshold}{The logging level used as a threshold for the \code{logger} package, which controls the verbosity of logging output.} } \value{ -No return value; this function is called for its side effects, -including data validation, file creation, and cloud uploading. +None; the function is used for its side effects, which include data validation and uploading validated data to cloud storage. } \description{ -This function validates Wildlife Conservation Society (WCS) survey data. It reads -configuration parameters, preprocesses surveys, -and performs various validations on survey duration, catches, lengths, and market data. -It also logs the process at specified -log thresholds. The function consolidates validated data and saves it as an RDS file, -which is then uploaded to cloud storage. +Validates Wildlife Conservation Society (WCS) survey data by checking for inconsistencies in survey duration, catches, lengths, and market data. The function preprocesses surveys, performs validations, logs the process, and uploads the validated data to cloud storage. } \examples{ -# Assuming necessary configuration and data are available: \dontrun{ validate_wcs_surveys(log_threshold = logger::INFO) } @@ -31,3 +23,5 @@ validate_wcs_surveys(log_threshold = logger::INFO) \seealso{ \code{\link{validate_catch}}, \code{\link{validate_length}}, \code{\link{validate_market}} } +\keyword{validation} +\keyword{workflow}