Skip to content

Commit

Permalink
"raster_to_grid" [iterim] "toTif" option.
Browse files Browse the repository at this point in the history
  • Loading branch information
mjohns-databricks committed Aug 30, 2024
1 parent c5bac6d commit 83268eb
Show file tree
Hide file tree
Showing 20 changed files with 878 additions and 397 deletions.
29 changes: 17 additions & 12 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,23 @@ The packaged JAR should be available in `target/`.

### Python bindings

The python bindings can be tested using [unittest](https://docs.python.org/3/library/unittest.html).
- Build the scala project and copy to the packaged JAR to the `python/mosaic/lib/` directory.
- Move to the `python/` directory and install the project and its dependencies:
`pip install . && pip install pyspark==<project_spark_version>`
(where 'project_spark_version' corresponds to the version of Spark
used for the target Databricks Runtime, e.g. `3.4.1` for DBR 13.3 LTS.
- Run the tests using `unittest`: `python -m unittest`

The project wheel file can be built with [build](https://pypa-build.readthedocs.io/en/stable/).
- Install the build requirements: `pip install build wheel`.
- Build the wheel using `python -m build`.
- Collect the .whl file from `python/dist/`
1. Testing - You can run the tests (recommended within docker container) using
[unittest](https://docs.python.org/3/library/unittest.html), e.g. `python -m unittest`.
2. Install the build requirements: `pip install build wheel`.
3. [Option] Build with Script - If you are within docker container, to build the WHL file you can just run the following
from project root (mosaic) dir: `sh scripts/docker/python-local-build.sh` (it will package jar and build WHL).
4. [Option] If doing build more manually (recommended within docker container using its init scripts):
- Build the scala project, e.g. `mvn package -DskipTests=true` if you have already tested successfully; that call.
will copy the packaged JAR to the `python/mosaic/lib/` directory (or you can do so manually) and be sure to verify
no older JARs are lingering.
- Move to the `python/` directory and install the project and its dependencies:
`pip install . && pip install pyspark==<project_spark_version>`
(where 'project_spark_version' corresponds to the version of Spark
used for the target Databricks Runtime, e.g. `3.4.1` for DBR 13.3 LTS.
- The project wheel file can be built with [build](https://pypa-build.readthedocs.io/en/stable/), e.g. `python -m build`.
5. Collect the .whl file from `python/dist/`:
- WHL contains the JAR.
- It is all that is needed for installing on a cluster (same for the deployed PyPI version).

### Documentation

Expand Down
13 changes: 13 additions & 0 deletions scripts/docker/python-local-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

# run from within docker container
# run from the repo root ('mosaic') level

# [1] delete existing jars in python dir
rm python/mosaic/lib/*.jar

# [2] package
mvn package -DskipTests=true

# [3] build
cd python && python3 -m build
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class RasterGDAL(
exprConfigOpt: Option[ExprConfig]
) extends RasterIO {

val DIR_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmm") // yyyyMMddHHmmss
val DIR_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss")

// Factory for creating CRS objects
protected val crsFactory: CRSFactory = new CRSFactory
Expand Down Expand Up @@ -343,8 +343,13 @@ case class RasterGDAL(
this.getDatasetOpt() match {
case Some(dataset) =>
// (2) srs from srid
val srs = new osr.SpatialReference()
srs.ImportFromEPSG(srid)
var srs: SpatialReference = null
if (srid == 0 || srid == 4326) {
srs = MosaicGDAL.WSG84
} else {
srs = new osr.SpatialReference()
srs.ImportFromEPSG(srid)
}

// (3) set srs on internal datasource
// - see (4) as well
Expand Down Expand Up @@ -700,6 +705,9 @@ case class RasterGDAL(

/**
* Get a particular subdataset by name.
* - This does not generate a new file.
* - It hydrates the dataset with the subset.
* - It also updates the path to include the subset.
* @param subsetName
* The name of the subdataset to get.
* @return
Expand Down Expand Up @@ -862,6 +870,7 @@ case class RasterGDAL(

/** @return new fuse dir underneath the base fuse dir (checkpoint or override) */
def makeNewFuseDir(ext: String, uuidOpt: Option[String]): String = {

// (1) uuid used in dir
// - may be provided (for filename consistency)
val uuid = uuidOpt match {
Expand All @@ -873,7 +882,7 @@ case class RasterGDAL(
val timePrefix = LocalDateTime.now().format(DIR_TIME_FORMATTER)
val newDir = s"${timePrefix}_${ext}_${uuid}"
val dir = s"$rootDir/$newDir"
Files.createDirectories(Paths.get(dir)) // <- create the directories
Files.createDirectories(Paths.get(dir))
dir
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import com.databricks.labs.mosaic.core.raster.api.{FormatLookup, GDAL}
import com.databricks.labs.mosaic.core.raster.gdal.{DatasetGDAL, PathGDAL, RasterBandGDAL, RasterGDAL}
import com.databricks.labs.mosaic.core.raster.io.RasterIO.{identifyDriverNameFromDataset, identifyDriverNameFromRawPath, identifyExtFromDriver}
import com.databricks.labs.mosaic.functions.ExprConfig
import com.databricks.labs.mosaic.gdal.MosaicGDAL
import com.databricks.labs.mosaic.utils.{PathUtils, SysUtils}
import org.gdal.gdal.{Dataset, Driver, gdal}
import org.gdal.gdalconst.gdalconstConstants.GA_ReadOnly
import org.gdal.ogr.DataSource
import org.gdal.osr

import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.{Vector => JVector}
import java.util.{Locale, Vector => JVector}
import scala.util.Try

/**
Expand Down Expand Up @@ -310,13 +311,12 @@ object RasterIO {
Try {
extOpt match {
case Some(ext) if ext != NO_EXT =>
val driver = gdal.IdentifyDriverEx(ext)
val driver = gdal.IdentifyDriverEx(ext.toLowerCase(Locale.ROOT))
try {
driver.getShortName
} finally {
driver.delete()
}

case _ => NO_DRIVER
}
}.getOrElse {
Expand Down Expand Up @@ -457,11 +457,7 @@ object RasterIO {

if (dsOpt.isDefined && Try(dsOpt.get.GetSpatialRef()).isFailure || dsOpt.get.GetSpatialRef() == null) {
// if SRS not set, try to set it to WGS84
Try{
val srs = new osr.SpatialReference()
srs.ImportFromEPSG(4326)
dsOpt.get.SetSpatialRef(srs)
}
Try(dsOpt.get.SetSpatialRef(MosaicGDAL.WSG84))
}

dsOpt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.databricks.labs.mosaic.core.raster.operator

import com.databricks.labs.mosaic.core.raster.gdal.{RasterGDAL, RasterWriteOptions}
import com.databricks.labs.mosaic.core.raster.operator.gdal.GDALTranslate
import com.databricks.labs.mosaic.functions.ExprConfig
import com.databricks.labs.mosaic.gdal.MosaicGDAL
import com.databricks.labs.mosaic.utils.PathUtils

import scala.util.Try

object TranslateToGTiff {

/**
* Translate a RasterGDAL [[org.gdal.gdal.Dataset]] to GeoTiff.
*
* @param inRaster
* [[RasterGDAL]] to translate.
* @param exprConfigOpt
* Option [[ExprConfig]]
* @return
* New RasterGDAL translated to GeoTiff.
*/
def compute(inRaster: RasterGDAL, exprConfigOpt: Option[ExprConfig]): RasterGDAL = {

// try to hydrate the provided raster
inRaster.getDatasetOpt() match {
case Some(dataset) =>
if (Try(dataset.GetSpatialRef()).isFailure || dataset.GetSpatialRef() == null) {
// if SRS not set, try to set it to WGS84
Try(dataset.SetSpatialRef(MosaicGDAL.WSG84))
}
val tifPath = PathUtils.createTmpFilePath("tif", exprConfigOpt)
// Modify defaults
// - essentially `RasterWriteOptions.GTiff`
// with the SRS set.
val outOptions = RasterWriteOptions(
crs = dataset.GetSpatialRef() // default - MosaicGDAL.WSG84
)

GDALTranslate.executeTranslate(
tifPath,
inRaster,
command = s"""gdal_translate""",
outOptions,
exprConfigOpt
)
case _ =>
val result = RasterGDAL() // <- empty raster
result.updateLastCmd("'gdal' format -> option 'toTif'")
result.updateError("Dataset is invalid (prior to tif convert")
result
}

}

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
package com.databricks.labs.mosaic.core.raster.operator.gdal

import com.databricks.labs.mosaic.{
NO_PATH_STRING,
RASTER_ALL_PARENTS_KEY,
RASTER_BAND_INDEX_KEY,
RASTER_DRIVER_KEY,
RASTER_LAST_CMD_KEY,
RASTER_LAST_ERR_KEY,
RASTER_PARENT_PATH_KEY,
RASTER_PATH_KEY,
RASTER_SUBDATASET_NAME_KEY
}
import com.databricks.labs.mosaic.{NO_PATH_STRING, RASTER_ALL_PARENTS_KEY, RASTER_BAND_INDEX_KEY, RASTER_DRIVER_KEY, RASTER_LAST_CMD_KEY, RASTER_LAST_ERR_KEY, RASTER_PARENT_PATH_KEY, RASTER_PATH_KEY, RASTER_SUBDATASET_NAME_KEY}
import com.databricks.labs.mosaic.core.raster.gdal.{RasterGDAL, RasterWriteOptions}
import com.databricks.labs.mosaic.core.raster.io.RasterIO.flushAndDestroy
import com.databricks.labs.mosaic.functions.ExprConfig
import org.gdal.gdal.{TranslateOptions, gdal}
import org.gdal.gdal.{Dataset, TranslateOptions, gdal}

import scala.util.Try

Expand Down Expand Up @@ -60,7 +50,7 @@ object GDALTranslate {
RASTER_PATH_KEY -> outputPath,
RASTER_PARENT_PATH_KEY -> raster.identifyPseudoPathOpt().getOrElse(NO_PATH_STRING),
RASTER_DRIVER_KEY -> writeOptions.format,
RASTER_SUBDATASET_NAME_KEY -> raster.getSubsetName,
//RASTER_SUBDATASET_NAME_KEY -> raster.getSubsetName, <- goes away after translate
RASTER_BAND_INDEX_KEY -> raster.getBandIdxOpt.getOrElse(-1).toString,
RASTER_LAST_CMD_KEY -> effectiveCommand,
RASTER_LAST_ERR_KEY -> errorMsg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object GDALWarp {
} else outputPath

val size = Try(Files.size(Paths.get(resultPath))).getOrElse(-1L)
// TODO - RE-VERIFY SUBDATASET HANDLING FOR WARP
val createInfo = Map(
RASTER_PATH_KEY -> resultPath,
RASTER_PARENT_PATH_KEY -> rasters.head.identifyPseudoPathOpt().getOrElse(NO_PATH_STRING),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ case class RasterTile(
// - safety net for parent path
val parentPath = this.raster.identifyPseudoPathOpt().getOrElse(NO_PATH_STRING)
val newCreateInfo = raster.getCreateInfo(includeExtras = true) + (RASTER_PATH_KEY -> path, RASTER_PARENT_PATH_KEY -> parentPath)
raster.updateCreateInfo(newCreateInfo) // <- in case tile is used after this

// (4) actual serialization
val mapData = buildMapString(newCreateInfo)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.databricks.labs.mosaic.datasource.gdal

import com.databricks.labs.mosaic.MOSAIC_RASTER_READ_IN_MEMORY
import com.databricks.labs.mosaic.core.index.IndexSystemFactory
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.functions.ExprConfig
import com.databricks.labs.mosaic.gdal.MosaicGDAL
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
Expand All @@ -24,6 +26,8 @@ class GDALFileFormat extends BinaryFileFormat {

import GDALFileFormat._

var firstRun = true

/**
* Infer schema for the tile file.
* @param sparkSession
Expand Down Expand Up @@ -117,8 +121,6 @@ class GDALFileFormat extends BinaryFileFormat {
options: Map[String, String],
hadoopConf: org.apache.hadoop.conf.Configuration
): PartitionedFile => Iterator[org.apache.spark.sql.catalyst.InternalRow] = {
// sets latest [[MosaicGDAL.exprConfigOpt]]
GDAL.enable(sparkSession)

val indexSystem = IndexSystemFactory.getIndexSystem(sparkSession)
val supportedExtensions = options.getOrElse("extensions", "*").split(";").map(_.trim.toLowerCase(Locale.ROOT))
Expand All @@ -129,15 +131,30 @@ class GDALFileFormat extends BinaryFileFormat {
// GDAL supports multiple reading strategies.
val reader = ReadStrategy.getReader(options)

// handle expression config
// - this is a special pattern
// for readers vs expressions
// - explicitely setting use checkpoint to true
val exprConfig = ExprConfig(sparkSession)
GDAL.enable(exprConfig) // <- appropriate for workers (MosaicGDAL on driver)
reader match {
case r if r.getReadStrategy == MOSAIC_RASTER_READ_IN_MEMORY =>
// update for 'in_memory'
exprConfig.setRasterUseCheckpoint("false")
case _ =>
// update for 'as_path' and 'subdivide_on_read'
exprConfig.setRasterUseCheckpoint("true")
}

file: PartitionedFile => {
val exprConfig = MosaicGDAL.exprConfigOpt

val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)

if (supportedExtensions.contains("*") || supportedExtensions.exists(status.getPath.getName.toLowerCase(Locale.ROOT).endsWith)) {
if (filterFuncs.forall(_.apply(status)) && isAllowedExtension(status, options)) {
reader.read(status, fs, requiredSchema, options, indexSystem, exprConfig)
reader.read(status, fs, requiredSchema, options, indexSystem, Some(exprConfig))
} else {
Iterator.empty
}
Expand All @@ -160,8 +177,6 @@ object GDALFileFormat {
val CONTENT = "content"
val X_SIZE = "x_size"
val Y_SIZE = "y_size"
// val X_OFFSET = "x_offset"
// val Y_OFFSET = "y_offset"
val BAND_COUNT = "bandCount"
val METADATA = "metadata"
val SUBDATASETS: String = "subdatasets"
Expand Down
Loading

0 comments on commit 83268eb

Please sign in to comment.