Skip to content

Commit

Permalink
Additional ExprConfig handling. Specified "in_memory" for R raster te…
Browse files Browse the repository at this point in the history
…sting.
  • Loading branch information
mjohns-databricks committed Sep 6, 2024
1 parent f987f70 commit da2a777
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
generate_singleband_raster_df <- function() {
generate_singleband_in_mem_raster_df <- function() {
read.df(
path = "sparkrMosaic/tests/testthat/data/MCD43A4.A2018185.h10v07.006.2018194033728_B04.TIF",
source = "gdal",
raster.read.strategy = "as_path" # <- changed to "as_path" strategy
raster.read.strategy = "in_memory"
)
}

test_that("mosaic can read single-band GeoTiff", {
sdf <- generate_singleband_raster_df()
sdf <- generate_singleband_in_mem_raster_df()
row <- first(sdf)
expect_equal(row$length, 1067862L)
expect_equal(row$x_size, 2400)
Expand All @@ -20,7 +20,7 @@ test_that("mosaic can read single-band GeoTiff", {
})

test_that("scalar raster functions behave as intended", {
sdf <- generate_singleband_raster_df()
sdf <- generate_singleband_in_mem_raster_df()
sdf <- withColumn(sdf, "rst_rastertogridavg", rst_rastertogridavg(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertogridcount", rst_rastertogridcount(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertogridmax", rst_rastertogridmax(column("tile"), lit(9L)))
Expand All @@ -45,33 +45,33 @@ test_that("scalar raster functions behave as intended", {
})

test_that("raster flatmap functions behave as intended", {
retiled_sdf <- generate_singleband_raster_df()
retiled_sdf <- generate_singleband_in_mem_raster_df()
retiled_sdf <- withColumn(retiled_sdf, "rst_retile", rst_retile(column("tile"), lit(1200L), lit(1200L)))

expect_no_error(write.df(retiled_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(retiled_sdf), 4)

subdivide_sdf <- generate_singleband_raster_df()
subdivide_sdf <- generate_singleband_in_mem_raster_df()
subdivide_sdf <- withColumn(subdivide_sdf, "rst_subdivide", rst_subdivide(column("tile"), lit(1L)))

expect_no_error(write.df(subdivide_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(subdivide_sdf), 4)

tessellate_sdf <- generate_singleband_raster_df()
tessellate_sdf <- generate_singleband_in_mem_raster_df()
tessellate_sdf <- withColumn(tessellate_sdf, "rst_tessellate", rst_tessellate(column("tile"), lit(3L)))

expect_no_error(write.df(tessellate_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(tessellate_sdf), 63)

overlap_sdf <- generate_singleband_raster_df()
overlap_sdf <- generate_singleband_in_mem_raster_df()
overlap_sdf <- withColumn(overlap_sdf, "rst_tooverlappingtiles", rst_tooverlappingtiles(column("tile"), lit(200L), lit(200L), lit(10L)))

expect_no_error(write.df(overlap_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(overlap_sdf), 87)
})

test_that("raster aggregation functions behave as intended", {
collection_sdf <- generate_singleband_raster_df()
collection_sdf <- generate_singleband_in_mem_raster_df()
collection_sdf <- withColumn(collection_sdf, "extent", st_astext(rst_boundingbox(column("tile"))))
collection_sdf <- withColumn(collection_sdf, "tile", rst_tooverlappingtiles(column("tile"), lit(200L), lit(200L), lit(10L)))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
generate_singleband_raster_df <- function() {
generate_singleband_in_mem_raster_df <- function() {
spark_read_source(
sc,
name = "raster",
source = "gdal",
path = "data/MCD43A4.A2018185.h10v07.006.2018194033728_B04.TIF",
options = list("raster.read.strategy" = "as_path") # <- changed to "as_path" strategy
options = list("raster.read.strategy" = "in_memory")
)
}


test_that("mosaic can read single-band GeoTiff", {
sdf <- generate_singleband_raster_df()
sdf <- generate_singleband_in_mem_raster_df()
row <- sdf %>% head(1) %>% sdf_collect
expect_equal(row$length, 1067862L)
expect_equal(row$x_size, 2400)
Expand All @@ -24,7 +24,7 @@ test_that("mosaic can read single-band GeoTiff", {


test_that("scalar raster functions behave as intended", {
sdf <- generate_singleband_raster_df() %>%
sdf <- generate_singleband_in_mem_raster_df() %>%
mutate(rst_bandmetadata = rst_bandmetadata(tile, 1L)) %>%
mutate(rst_boundingbox = rst_boundingbox(tile)) %>%
mutate(rst_boundingbox = st_buffer(rst_boundingbox, -0.001)) %>%
Expand All @@ -49,7 +49,7 @@ test_that("scalar raster functions behave as intended", {
# breaking the chain here to avoid memory issues
expect_no_error(spark_write_source(sdf, "noop", mode = "overwrite"))

sdf <- generate_singleband_raster_df() %>%
sdf <- generate_singleband_in_mem_raster_df() %>%
mutate(rst_rastertogridavg = rst_rastertogridavg(tile, 9L)) %>%
mutate(rst_rastertogridcount = rst_rastertogridcount(tile, 9L)) %>%
mutate(rst_rastertogridmax = rst_rastertogridmax(tile, 9L)) %>%
Expand All @@ -74,25 +74,25 @@ test_that("scalar raster functions behave as intended", {
})

test_that("raster flatmap functions behave as intended", {
retiled_sdf <- generate_singleband_raster_df() %>%
retiled_sdf <- generate_singleband_in_mem_raster_df() %>%
mutate(rst_retile = rst_retile(tile, 1200L, 1200L))

expect_no_error(spark_write_source(retiled_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(retiled_sdf), 4)

subdivide_sdf <- generate_singleband_raster_df() %>%
subdivide_sdf <- generate_singleband_in_mem_raster_df() %>%
mutate(rst_subdivide = rst_subdivide(tile, 1L))

expect_no_error(spark_write_source(subdivide_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(subdivide_sdf), 4)

tessellate_sdf <- generate_singleband_raster_df() %>%
tessellate_sdf <- generate_singleband_in_mem_aster_df() %>%
mutate(rst_tessellate = rst_tessellate(tile, 3L))

expect_no_error(spark_write_source(tessellate_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(tessellate_sdf), 63)

overlap_sdf <- generate_singleband_raster_df() %>%
overlap_sdf <- generate_singleband_in_mem_raster_df() %>%
mutate(rst_tooverlappingtiles = rst_tooverlappingtiles(tile, 200L, 200L, 10L))

expect_no_error(spark_write_source(overlap_sdf, "noop", mode = "overwrite"))
Expand All @@ -101,7 +101,7 @@ test_that("raster flatmap functions behave as intended", {
})

test_that("raster aggregation functions behave as intended", {
collection_sdf <- generate_singleband_raster_df() %>%
collection_sdf <- generate_singleband_in_mem_raster_df() %>%
mutate(extent = st_astext(rst_boundingbox(tile))) %>%
mutate(tile = rst_tooverlappingtiles(tile, 200L, 200L, 10L))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ case class DatasetGDAL() {
* @param dataset
* [[Dataset]] to update.
* @param doUpdateDriver
* Whether to upate `driverName`, if dataset is null, falls back to [[NO_DRIVER]]
* Whether to update `driverName` or keep current
* @return
*/
def updateDataset(dataset: Dataset, doUpdateDriver: Boolean): DatasetGDAL = {
Expand All @@ -433,9 +433,6 @@ case class DatasetGDAL() {
if (this.isHydrated && doUpdateDriver) {
this.updateDriverName(
RasterIO.identifyDriverNameFromDataset(this.dataset))
} else if (doUpdateDriver) {
this.updateDriverName(NO_DRIVER)
this.dsErrFlag = true
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object OverlappingTiles {

/**
* Retiles a tile into overlapping tiles.
*
*
* @note
* The overlap percentage is a percentage of the tile size.
* @param tile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.databricks.labs.mosaic.MOSAIC_RASTER_READ_IN_MEMORY
import com.databricks.labs.mosaic.core.index.IndexSystemFactory
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.functions.ExprConfig
import com.databricks.labs.mosaic.gdal.MosaicGDAL
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
Expand All @@ -19,14 +20,13 @@ import org.apache.spark.util.SerializableConfiguration
import java.net.URI
import java.sql.Timestamp
import java.util.Locale
import scala.util.Try

/** A file format for reading binary files using GDAL. */
class GDALFileFormat extends BinaryFileFormat {

import GDALFileFormat._

var firstRun = true

/**
* Infer schema for the tile file.
* @param sparkSession
Expand Down Expand Up @@ -120,40 +120,26 @@ class GDALFileFormat extends BinaryFileFormat {
options: Map[String, String],
hadoopConf: org.apache.hadoop.conf.Configuration
): PartitionedFile => Iterator[org.apache.spark.sql.catalyst.InternalRow] = {

val indexSystem = IndexSystemFactory.getIndexSystem(sparkSession)
val supportedExtensions = options.getOrElse("extensions", "*").split(";").map(_.trim.toLowerCase(Locale.ROOT))
val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val filterFuncs = filters.flatMap(createFilterFunction)
// Suitable on the driver
MosaicGDAL.enableGDAL(sparkSession)

// Identify the reader to use for the file format.
// GDAL supports multiple reading strategies.
val reader = ReadStrategy.getReader(options)

// handle expression config
// - this is a special pattern
// for readers vs expressions
// - explicitely setting use checkpoint to true
val exprConfig = ExprConfig(sparkSession)
GDAL.enable(exprConfig) // <- appropriate for workers (MosaicGDAL on driver)
reader match {
case r if r.getReadStrategy == MOSAIC_RASTER_READ_IN_MEMORY =>
// update for 'in_memory'
exprConfig.setRasterUseCheckpoint("false")
case _ =>
// update for 'as_path' and 'subdivide_on_read'
exprConfig.setRasterUseCheckpoint("true")
}
val indexSystem = IndexSystemFactory.getIndexSystem(sparkSession)
val supportedExtensions = options.getOrElse("extensions", "*").split(";").map(_.trim.toLowerCase(Locale.ROOT))
val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val filterFuncs = filters.flatMap(createFilterFunction)

file: PartitionedFile => {

val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)

if (supportedExtensions.contains("*") || supportedExtensions.exists(status.getPath.getName.toLowerCase(Locale.ROOT).endsWith)) {
if (filterFuncs.forall(_.apply(status)) && isAllowedExtension(status, options)) {
reader.read(status, fs, requiredSchema, options, indexSystem, Some(exprConfig))
reader.read(status, fs, requiredSchema, options, indexSystem)
} else {
Iterator.empty
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package com.databricks.labs.mosaic.datasource.gdal

import com.databricks.labs.mosaic.{
MOSAIC_RASTER_READ_AS_PATH,
NO_DRIVER,
RASTER_DRIVER_KEY,
RASTER_PARENT_PATH_KEY,
RASTER_PATH_KEY,
RASTER_SUBDATASET_NAME_KEY
}
import com.databricks.labs.mosaic.{MOSAIC_RASTER_READ_AS_PATH, MOSAIC_URI_DEEP_CHECK, MOSAIC_URI_DEEP_CHECK_DEFAULT, NO_DRIVER, RASTER_DRIVER_KEY, RASTER_PARENT_PATH_KEY, RASTER_PATH_KEY, RASTER_SUBDATASET_NAME_KEY}
import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.gdal.RasterGDAL
import com.databricks.labs.mosaic.core.raster.io.RasterIO.identifyDriverNameFromExtOpt
import com.databricks.labs.mosaic.core.types.RasterTileType
Expand Down Expand Up @@ -88,8 +82,6 @@ object ReadAsPath extends ReadStrategy {
* Options passed to the reader.
* @param indexSystem
* Index system.
* @param exprConfigOpt
* Option [[ExprConfig]].
* @return
* Iterator of internal rows.
*/
Expand All @@ -98,14 +90,24 @@ object ReadAsPath extends ReadStrategy {
fs: FileSystem,
requiredSchema: StructType,
options: Map[String, String],
indexSystem: IndexSystem,
exprConfigOpt: Option[ExprConfig]
indexSystem: IndexSystem
): Iterator[InternalRow] = {

// Expression Config
// - index system set
// - use checkpoint set to true
// - deep check set
// - GDAL enable called on worker
val exprConfigOpt = Some(new ExprConfig(Map.empty[String, String]))
exprConfigOpt.get.setIndexSystem(indexSystem.name)
exprConfigOpt.get.setRasterUseCheckpoint("true")
exprConfigOpt.get.setUriDeepCheck(options.getOrElse(MOSAIC_URI_DEEP_CHECK, MOSAIC_URI_DEEP_CHECK_DEFAULT))
GDAL.enable(exprConfigOpt.get)

val inPath = status.getPath.toString
val uuid = getUUID(status)
val tmpPath = PathUtils.copyToTmp(inPath, exprConfigOpt)
val uriDeepCheck = Try(exprConfigOpt.get.isUriDeepCheck).getOrElse(false)
val uriDeepCheck = exprConfigOpt.get.isUriDeepCheck
val uriGdalOpt = PathUtils.parseGdalUriOpt(inPath, uriDeepCheck)
val extOpt = PathUtils.getExtOptFromPath(inPath, uriGdalOpt)
val driverName = options.getOrElse("driverName", NO_DRIVER) match {
Expand All @@ -114,26 +116,28 @@ object ReadAsPath extends ReadStrategy {
}
// Allow subdataset for read as path
// - subdataset is important also for Zarr with groups
val subsetName = options.getOrElse(RASTER_SUBDATASET_NAME_KEY, "")
val raster = RasterGDAL(
Map(
RASTER_PATH_KEY -> tmpPath,
RASTER_PARENT_PATH_KEY -> inPath,
RASTER_DRIVER_KEY -> driverName,
RASTER_SUBDATASET_NAME_KEY -> options.getOrElse(RASTER_SUBDATASET_NAME_KEY, "")
RASTER_SUBDATASET_NAME_KEY -> subsetName
),
exprConfigOpt
).tryInitAndHydrate()

if (!raster.isEmptyRasterGDAL && exprConfigOpt.isDefined) {
)
if (!raster.isEmptyRasterGDAL) {
// explicitly set the checkpoint dir
// the reader doesn't always have the configured information
raster.setFuseDirOpt(Some(exprConfigOpt.get.getRasterCheckpoint))
val checkDir = exprConfigOpt.get.getRasterCheckpoint
raster.setFuseDirOpt(Some(checkDir))
}

val tile = RasterTile(null, raster, tileDataType)
// don't destroy the raster since we need to read from it...
// - raster will have the updated fuse path
val tileRow = tile
.formatCellId(indexSystem)
.serialize(tileDataType, doDestroy = true, exprConfigOpt)
.serialize(tileDataType, doDestroy = false, exprConfigOpt)
val trimmedSchema = StructType(requiredSchema.filter(field => field.name != TILE))
val fields = trimmedSchema.fieldNames.map {
case PATH => status.getPath.toString
Expand All @@ -148,6 +152,7 @@ object ReadAsPath extends ReadStrategy {
case LENGTH => raster.getMemSize
case other => throw new RuntimeException(s"Unsupported field name: $other")
}
raster.flushAndDestroy() // <- destroy after getting details
val row = Utils.createRow(fields ++ Seq(tileRow))
val rows = Seq(row)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.databricks.labs.mosaic.datasource.gdal

import com.databricks.labs.mosaic.{MOSAIC_RASTER_READ_IN_MEMORY, RASTER_DRIVER_KEY, RASTER_PARENT_PATH_KEY, RASTER_PATH_KEY, RASTER_SUBDATASET_NAME_KEY}
import com.databricks.labs.mosaic.{MOSAIC_RASTER_READ_IN_MEMORY, MOSAIC_URI_DEEP_CHECK, MOSAIC_URI_DEEP_CHECK_DEFAULT, RASTER_DRIVER_KEY, RASTER_PARENT_PATH_KEY, RASTER_PATH_KEY, RASTER_SUBDATASET_NAME_KEY}
import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.gdal.RasterGDAL
import com.databricks.labs.mosaic.core.raster.io.RasterIO.identifyDriverNameFromRawPath
import com.databricks.labs.mosaic.core.types.RasterTileType
Expand Down Expand Up @@ -73,8 +74,6 @@ object ReadInMemory extends ReadStrategy {
* Options passed to the reader.
* @param indexSystem
* Index system.
* @param exprConfigOpt
* Option [[ExprConfig]].
* @return
* Iterator of internal rows.
*/
Expand All @@ -83,15 +82,22 @@ object ReadInMemory extends ReadStrategy {
fs: FileSystem,
requiredSchema: StructType,
options: Map[String, String],
indexSystem: IndexSystem,
exprConfigOpt: Option[ExprConfig]
indexSystem: IndexSystem
): Iterator[InternalRow] = {

// Expression Config
// - index system set
// - use checkpoint set to true
// - deep check set
// - GDAL enable called on worker
val exprConfigOpt = Some(new ExprConfig(Map.empty[String, String]))
exprConfigOpt.get.setIndexSystem(indexSystem.name)
exprConfigOpt.get.setRasterUseCheckpoint("false")
exprConfigOpt.get.setUriDeepCheck(options.getOrElse(MOSAIC_URI_DEEP_CHECK, MOSAIC_URI_DEEP_CHECK_DEFAULT))
GDAL.enable(exprConfigOpt.get)

val inPath = status.getPath.toString
val uriDeepCheck = {
if (options.contains("uriDeepCheck")) options("uriDeepCheck").toBoolean
else Try(exprConfigOpt.get.isUriDeepCheck).getOrElse(false)
}
val uriDeepCheck = exprConfigOpt.get.isUriDeepCheck
val uriGdalOpt = PathUtils.parseGdalUriOpt(inPath, uriDeepCheck)
val driverName = options.get("driverName") match {
case Some(name) if name.nonEmpty => name
Expand Down
Loading

0 comments on commit da2a777

Please sign in to comment.