Skip to content

Commit

Permalink
Add other missing data types.
Browse files Browse the repository at this point in the history
  • Loading branch information
Imbruced committed Sep 29, 2024
1 parent 5a4a2f6 commit 6ee9fbb
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 207 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,36 @@
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager
import org.apache.sedona.sql.datasources.geopackage.model.TableType.TableType
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, TableType}
import org.apache.hadoop.fs.Path
import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util.Locale
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
import scala.util.Try

class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister {

override def fallbackFileFormat: Class[_ <: FileFormat] = {
null
}

override protected def getTable(options: CaseInsensitiveStringMap): Table = {
val loadOptions = getLoadOptions(options)

GeoPackageTable(
"",
sparkSession,
options,
Seq(loadOptions.path),
getTransformedPath(options),
None,
fallbackFileFormat,
loadOptions)
getLoadOptions(options))
}

private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageLoadOptions = {
private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageOptions = {
val path = options.get("path")
if (path.isEmpty) {
throw new IllegalArgumentException("GeoPackage path is not specified")
Expand All @@ -65,22 +66,33 @@ class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister {
maybeTableName
}

val fields = GeoPackageConnectionManager
.getSchema(path, tableName)
GeoPackageOptions(tableName = tableName, showMetadata = showMetadata)
}

GeoPackageLoadOptions(
path = path,
showMetadata = showMetadata,
tableName = tableName,
tableType = getTableType(showMetadata = showMetadata, path = path, tableName = tableName),
fields = fields)
private def getTransformedPath(options: CaseInsensitiveStringMap): Seq[String] = {
val paths = getPaths(options)
transformPaths(paths, options)
}

private def getTableType(showMetadata: Boolean, path: String, tableName: String): TableType = {
if (showMetadata) {
TableType.METADATA
} else {
GeoPackageConnectionManager.findFeatureMetadata(path, tableName)
private def transformPaths(
paths: Seq[String],
options: CaseInsensitiveStringMap): Seq[String] = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
paths.map { pathString =>
if (pathString.toLowerCase(Locale.ROOT).endsWith(".geopackage")) {
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf)

val isDirectory = Try(fs.getFileStatus(path).isDirectory).getOrElse(false)
if (isDirectory) {
pathString
} else {
pathString.substring(0, pathString.length - 3) + "???"
}
} else {
pathString
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,75 @@
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager
import org.apache.hadoop.fs.Path
import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.TableType.{FEATURES, METADATA, TILES, UNKNOWN}
import org.apache.sedona.sql.datasources.geopackage.model.{PartitionOptions, TileRowMetadata}
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageReadOptions, PartitionOptions, TileRowMetadata}
import org.apache.sedona.sql.datasources.geopackage.transform.ValuesMapper
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.util.SerializableConfiguration

import java.io.File
import java.sql.ResultSet

case class GeoPackagePartitionReader(
var values: Seq[Any],
rs: ResultSet,
options: PartitionOptions)
extends PartitionReader[InternalRow] {

def this(partitionOptions: PartitionOptions) = {
this(
Seq.empty,
GeoPackageConnectionManager.getTableCursor(partitionOptions),
partitionOptions)
}
var rs: ResultSet,
options: GeoPackageReadOptions,
broadcastedConf: Broadcast[SerializableConfiguration],
var currentTempFile: File)
extends PartitionReader[InternalRow] {

private var values: Seq[Any] = Seq.empty
private var currentFile = options.currentFile
private val partitionedFiles = options.partitionedFiles

override def next(): Boolean = {
val hasNext = rs.next()
if (!hasNext) {
if (rs.next()) {
values = ValuesMapper.mapValues(adjustPartitionOptions, rs)
return true
}

partitionedFiles.remove(currentFile)

if (partitionedFiles.isEmpty) {
return false
}

rs.close()

currentFile = partitionedFiles.head
val tempFile = FileSystemUtils.copyToLocal(
options = broadcastedConf.value.value,
file = new Path(currentFile.filePath))

currentTempFile.deleteOnExit()

currentTempFile = tempFile

rs = GeoPackageConnectionManager.getTableCursor(currentTempFile.getPath, options.tableName)

if (!rs.next()) {
return false
}

values = ValuesMapper.mapValues(adjustPartitionOptions, rs)

hasNext
true
}

private def adjustPartitionOptions: PartitionOptions = {
options.loadOptions.tableType match {
case FEATURES | METADATA => options
options.partitionOptions.tableType match {
case FEATURES | METADATA => options.partitionOptions
case TILES =>
val tileRowMetadata = TileRowMetadata(
zoomLevel = rs.getInt("zoom_level"),
tileColumn = rs.getInt("tile_column"),
tileRow = rs.getInt("tile_row"))

options.withTileRowMetadata(tileRowMetadata)
case UNKNOWN => options
options.partitionOptions.withTileRowMetadata(tileRowMetadata)
case UNKNOWN => options.partitionOptions
}

}
Expand All @@ -72,5 +97,6 @@ case class GeoPackagePartitionReader(

override def close(): Unit = {
rs.close()
options.tempFile.delete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,70 @@
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, PartitionOptions, TableType}
import org.apache.hadoop.fs.Path
import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.TableType.TILES
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, GeoPackageReadOptions, PartitionOptions, TableType}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

class GeoPackagePartitionReaderFactory(loadOptions: GeoPackageLoadOptions)
extends PartitionReaderFactory {
case class GeoPackagePartitionReaderFactory(
sparkSession: SparkSession,
broadcastedConf: Broadcast[SerializableConfiguration],
loadOptions: GeoPackageOptions,
dataSchema: StructType)
extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
if (loadOptions.showMetadata) {
return new GeoPackagePartitionReader(PartitionOptions.fromLoadOptions(loadOptions))
val partitionFiles = partition match {
case filePartition: FilePartition => filePartition.files
case _ =>
throw new IllegalArgumentException(
s"Unexpected partition type: ${partition.getClass.getCanonicalName}")
}

loadOptions.tableType match {
case TableType.FEATURES =>
new GeoPackagePartitionReader(PartitionOptions.fromLoadOptions(loadOptions))
val tempFile = FileSystemUtils.copyToLocal(
options = broadcastedConf.value.value,
file = new Path(partitionFiles.head.filePath))

case TableType.TILES =>
val tileMetadata =
GeoPackageConnectionManager.findTilesMetadata(loadOptions.path, loadOptions.tableName)
val tableType = if (loadOptions.showMetadata) {
TableType.METADATA
} else {
GeoPackageConnectionManager.findFeatureMetadata(tempFile.getPath, loadOptions.tableName)
}

val rs =
GeoPackageConnectionManager.getTableCursor(tempFile.getAbsolutePath, loadOptions.tableName)

val schema = GeoPackageConnectionManager.getSchema(tempFile.getPath, loadOptions.tableName)

new GeoPackagePartitionReader(
PartitionOptions.fromLoadOptions(loadOptions).withTile(tileMetadata))
if (StructType(schema.map(_.toStructField(tableType))) != dataSchema) {
throw new IllegalArgumentException(
s"Schema mismatch: expected $dataSchema, got ${StructType(schema.map(_.toStructField(tableType)))}")
}

val tileMetadata = tableType match {
case TILES =>
Some(
GeoPackageConnectionManager.findTilesMetadata(tempFile.getPath, loadOptions.tableName))
case _ => None
}

GeoPackagePartitionReader(
rs = rs,
options = GeoPackageReadOptions(
tableName = loadOptions.tableName,
tempFile = tempFile,
partitionOptions =
PartitionOptions(tableType = tableType, columns = schema, tile = tileMetadata),
partitionedFiles = scala.collection.mutable.HashSet(partitionFiles: _*),
currentFile = partitionFiles.head),
broadcastedConf = broadcastedConf,
currentTempFile = tempFile)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,27 @@
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions
import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import scala.jdk.CollectionConverters.mapAsScalaMapConverter

case class GeoPackageScan(
dataSchema: StructType,
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
loadOptions: GeoPackageLoadOptions)
extends FileScan {
dataSchema: StructType,
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScan {

override def partitionFilters: Seq[Expression] = {
Seq.empty
Expand All @@ -44,6 +49,11 @@ case class GeoPackageScan(
}

override def createReaderFactory(): PartitionReaderFactory = {
new GeoPackagePartitionReaderFactory(loadOptions)
val caseSensitiveMap = options.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, GeoPackageOptions}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class GeoPackageScanBuilder(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
loadOptions: GeoPackageLoadOptions)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {

override def build(): Scan = {
GeoPackageScan(
Expand All @@ -39,6 +41,7 @@ class GeoPackageScanBuilder(
fileIndex,
dataSchema,
readPartitionSchema(),
options,
loadOptions)
}
}
Loading

0 comments on commit 6ee9fbb

Please sign in to comment.