Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #268 easier user experience for splitting datasets #272

Merged
merged 17 commits into from
Sep 27, 2024
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Suggests:
knitr,
labelled,
metacore,
pharmaverseadam,
bms63 marked this conversation as resolved.
Show resolved Hide resolved
readxl,
rmarkdown,
testthat (>= 3.0.0),
Expand Down
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# xportr 0.4.0

## xportr development version

* New argument in `xportr_write()` allows users to specify the maximum file size (in GB) of their exported xpt files. (#268)
bms63 marked this conversation as resolved.
Show resolved Hide resolved

## New Features

* All core functions can be run together by using new function `xportr()` (#137)
Expand Down
70 changes: 57 additions & 13 deletions R/write.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#' @param .df A data frame to write.
#' @param path Path where transport file will be written. File name sans will be
#' used as `xpt` name.
#' @param max_size_gb Maximum size in GB of the exported files.
bms63 marked this conversation as resolved.
Show resolved Hide resolved
#' @param label `r lifecycle::badge("deprecated")` Previously used to to set the Dataset label.
#' Use the `metadata` argument to set the dataset label.
#' @param strict_checks If TRUE, xpt validation will report errors and not write
Expand Down Expand Up @@ -47,6 +48,7 @@
#'
xportr_write <- function(.df,
path,
max_size_gb = NULL,
bms63 marked this conversation as resolved.
Show resolved Hide resolved
metadata = NULL,
domain = NULL,
strict_checks = FALSE,
Expand Down Expand Up @@ -107,22 +109,11 @@ xportr_write <- function(.df,
tryCatch(
{
# If data is not split, data is just written out
if (is.null(attr(data, "_xportr.split_by_"))) {
bms63 marked this conversation as resolved.
Show resolved Hide resolved
if (is.null(max_size_gb)) {
write_xpt(data, path = path, version = 5, name = name)
check_xpt_size(path)
} else {
# If data is split, perform the split and get an index for the for loop
split_data <- split(data, data[[attr(data, "_xportr.split_by_")]])
split_index <- unique(data[[attr(data, "_xportr.split_by_")]])
paths <- get_split_path(path, seq_along(split_index))
# Iterate on the unique values of the split
for (i in seq_along(split_index)) {
# Write out the split data, `get_split_path` will determine file name
write_xpt(split_data[[i]],
path = paths[i], version = 5, name = name
)
check_xpt_size(paths[i])
}
export_to_xpt(data, path = path, max_size_gb = max_size_gb, file_prefix = name)
}
},
error = function(err) {
Expand Down Expand Up @@ -156,3 +147,56 @@ get_split_path <- function(path, ind) {
tools::file_ext(path)
)
}

#' Function to export data frame to xpt files ensuring each file does not exceed the maximum specified size in GB
#'
#' @param df A data frame to write.
#' @param max_size_gb Maximum size in GB of the exported files.
#' @param file_prefix Name of each exported file.
#'
#' @noRd

export_to_xpt <- function(.df, path, max_size_gb, file_prefix) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size of the xpt file doesn't exceed the maximum size, no counter should be appended to the filename.

The results of the function are correct. However, the performance could be improved. If the file size is smaller than the maximum size, the file is written log(nr_rows) times.

Maybe we could write the complete file first and if it exceeds the maximum size, an estimate for the number of rows is calculated based on the file size and the number of rows. If one of the parts exceeds the maximum size, the estimate is adjusted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should optimize the code. For testing I used an ADLB dataset which written as xpt file has 400MB. I wrote the dataset with different values for max_size_gb:

> system.time(xportr_write(adlb, "adlb.xpt"))
   user  system elapsed 
 25.730   2.258  27.857 
> system.time(xportr_write(adlb, "adlb.xpt", max_size_gb = 1))
Data frame exported to 1 xpt files.
   user  system elapsed 
 50.154  22.826  73.497 
> system.time(xportr_write(adlb, "adlb.xpt", max_size_gb = 0.3))
Data frame exported to 2 xpt files.
   user  system elapsed 
247.439  25.566 275.742 

@adchan11 , @bms63 , @rossfarrugia , what do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less of a concern from my side as most commonly would be ran with 4, 5, or 10 (so creating less splits than your examples above) and usually just a one time job at the end only impacting certain large datasets, so not like people will be running this often.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super worried about optimizing either. Hopefully, xpts go away in the next couple of years!

# Convert GB to bytes
max_size_bytes <- max_size_gb * 1000^3

total_rows <- nrow(.df)
chunk_counter <- 1
row_start <- 1
dir_path <- dirname(path)

while (row_start <= total_rows) {
# Binary search to find the maximum number of rows that fit within the size limit
low <- row_start
high <- total_rows
best_fit <- row_start

while (low <= high) {
mid <- floor((low + high) / 2)
temp_file <- tempfile()
write_xpt(.df[row_start:mid, ], temp_file)
file_size <- file.info(temp_file)$size

if (file_size <= max_size_bytes) {
best_fit <- mid
low <- mid + 1
} else {
high <- mid - 1
}

unlink(temp_file) # Clean up the temporary file
}

# Write the best fitting chunk to the final file
file_name <- sprintf("%s%d.xpt", file_prefix, chunk_counter)
path <- file.path(dir_path, file_name)

write_xpt(.df[row_start:best_fit, ], path = path, version = 5, name = file_prefix)

# Update counters
row_start <- best_fit + 1
chunk_counter <- chunk_counter + 1
}

cat("Data frame exported to", chunk_counter - 1, "xpt files.\n")
}
1 change: 1 addition & 0 deletions inst/WORDLIST
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ https
incompletedatetime
intervaldatetime
iso
lgl
magrittr
metacore
num
Expand Down
3 changes: 3 additions & 0 deletions man/xportr_write.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 28 additions & 30 deletions tests/testthat/test-write.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,47 +212,45 @@ test_that("write Test 13: Capture errors by haven and report them as such", {
)
})

## Test 14: xportr_write: `split_by` attribute is used to split the data ----
test_that("write Test 14: `split_by` attribute is used to split the data", {
## Test 14: xportr_write: `max_size_gb` is used to split data frame into specified maximum file size ----
test_that("write Test 14: `max_size_gb` is used to split data frame into specified maximum file size", {
adlb <- pharmaverseadam::adlb

tmpdir <- tempdir()
tmp <- file.path(tmpdir, "xyz.xpt")

on.exit(unlink(tmpdir))
# 20 mb
max_size_gb <- 20 / 1000

dts <- data_to_save()
dts %>%
xportr_split(split_by = "X") %>%
xportr_write(path = tmp)
xportr_write(adlb,
path = paste0(tmpdir, "/adlb.xpt"),
domain = "adlb",
max_size_gb = max_size_gb,
strict_checks = FALSE
)

expect_true(
file.exists(file.path(tmpdir, "xyz1.xpt"))
file.exists(file.path(tmpdir, "adlb1.xpt")),
file.info(file.path(tmpdir, "adlb1.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = FALSE))
)

expect_true(
file.exists(file.path(tmpdir, "xyz2.xpt"))
file.exists(file.path(tmpdir, "adlb2.xpt")),
file.info(file.path(tmpdir, "adlb2.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = FALSE))
)

expect_true(
file.exists(file.path(tmpdir, "xyz3.xpt"))
file.exists(file.path(tmpdir, "adlb3.xpt")),
file.info(file.path(tmpdir, "adlb3.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = FALSE))
)
expect_equal(
read_xpt(file.path(tmpdir, "xyz1.xpt")) %>%
extract2("X") %>%
unique() %>%
length(),
1
)
expect_equal(
read_xpt(file.path(tmpdir, "xyz2.xpt")) %>%
extract2("X") %>%
unique() %>%
length(),
1

expect_true(
file.exists(file.path(tmpdir, "adlb4.xpt")),
file.info(file.path(tmpdir, "adlb4.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = FALSE))
)
expect_equal(
read_xpt(file.path(tmpdir, "xyz3.xpt")) %>%
extract2("X") %>%
unique() %>%
length(),
1

expect_true(
file.exists(file.path(tmpdir, "adlb5.xpt")),
file.info(file.path(tmpdir, "adlb5.xpt"))$size <= as.numeric(format(max_size_gb * 10^9, scientific = FALSE))
)
})

Expand Down
Loading