diff --git a/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeopackagePartition.scala b/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeopackagePartition.scala deleted file mode 100644 index 46b39caa56..0000000000 --- a/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeopackagePartition.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sedona.sql.datasources.geopackage - -import org.apache.spark.Partition -import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.execution.datasources.PartitionedFile - -case class GeopackagePartition(index: Int, files: Array[PartitionedFile]) - extends Partition - with InputPartition diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala index 5e3f14694d..7721c38909 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala @@ -18,15 +18,18 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager -import org.apache.sedona.sql.datasources.geopackage.model.TableType.TableType -import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, TableType} +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.util.CaseInsensitiveStringMap +import java.util.Locale +import scala.jdk.CollectionConverters.mapAsScalaMapConverter +import scala.util.Try + class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister { override def fallbackFileFormat: Class[_ <: FileFormat] = { @@ -34,19 +37,17 @@ class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister { } override protected def getTable(options: CaseInsensitiveStringMap): Table = { - val loadOptions = getLoadOptions(options) - GeoPackageTable( "", sparkSession, options, - Seq(loadOptions.path), + getTransformedPath(options), None, fallbackFileFormat, - loadOptions) + getLoadOptions(options)) } - private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageLoadOptions = { + private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageOptions = { val path = options.get("path") if (path.isEmpty) { throw new IllegalArgumentException("GeoPackage path is not specified") @@ -65,22 +66,33 @@ class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister { maybeTableName } - val fields = GeoPackageConnectionManager - .getSchema(path, tableName) + GeoPackageOptions(tableName = tableName, showMetadata = showMetadata) + } - GeoPackageLoadOptions( - path = path, - showMetadata = showMetadata, - tableName = tableName, - tableType = getTableType(showMetadata = showMetadata, path = path, tableName = tableName), - fields = fields) + private def getTransformedPath(options: CaseInsensitiveStringMap): Seq[String] = { + val paths = getPaths(options) + transformPaths(paths, options) } - private def getTableType(showMetadata: Boolean, path: String, tableName: String): TableType = { - if (showMetadata) { - TableType.METADATA - } else { - GeoPackageConnectionManager.findFeatureMetadata(path, tableName) + private def transformPaths( + paths: Seq[String], + options: CaseInsensitiveStringMap): Seq[String] = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + paths.map { pathString => + if (pathString.toLowerCase(Locale.ROOT).endsWith(".geopackage")) { + val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) + + val isDirectory = Try(fs.getFileStatus(path).isDirectory).getOrElse(false) + if (isDirectory) { + pathString + } else { + pathString.substring(0, pathString.length - 3) + "???" + } + } else { + pathString + } } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala index 630162e5e1..fa468204ed 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala @@ -18,50 +18,75 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} import org.apache.sedona.sql.datasources.geopackage.model.TableType.{FEATURES, METADATA, TILES, UNKNOWN} -import org.apache.sedona.sql.datasources.geopackage.model.{PartitionOptions, TileRowMetadata} +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageReadOptions, PartitionOptions, TileRowMetadata} import org.apache.sedona.sql.datasources.geopackage.transform.ValuesMapper +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.util.SerializableConfiguration +import java.io.File import java.sql.ResultSet case class GeoPackagePartitionReader( - var values: Seq[Any], - rs: ResultSet, - options: PartitionOptions) - extends PartitionReader[InternalRow] { - - def this(partitionOptions: PartitionOptions) = { - this( - Seq.empty, - GeoPackageConnectionManager.getTableCursor(partitionOptions), - partitionOptions) - } + var rs: ResultSet, + options: GeoPackageReadOptions, + broadcastedConf: Broadcast[SerializableConfiguration], + var currentTempFile: File) + extends PartitionReader[InternalRow] { + + private var values: Seq[Any] = Seq.empty + private var currentFile = options.currentFile + private val partitionedFiles = options.partitionedFiles override def next(): Boolean = { - val hasNext = rs.next() - if (!hasNext) { + if (rs.next()) { + values = ValuesMapper.mapValues(adjustPartitionOptions, rs) + return true + } + + partitionedFiles.remove(currentFile) + + if (partitionedFiles.isEmpty) { + return false + } + + rs.close() + + currentFile = partitionedFiles.head + val tempFile = FileSystemUtils.copyToLocal( + options = broadcastedConf.value.value, + file = new Path(currentFile.filePath)) + + currentTempFile.deleteOnExit() + + currentTempFile = tempFile + + rs = GeoPackageConnectionManager.getTableCursor(currentTempFile.getPath, options.tableName) + + if (!rs.next()) { return false } values = ValuesMapper.mapValues(adjustPartitionOptions, rs) - hasNext + true } private def adjustPartitionOptions: PartitionOptions = { - options.loadOptions.tableType match { - case FEATURES | METADATA => options + options.partitionOptions.tableType match { + case FEATURES | METADATA => options.partitionOptions case TILES => val tileRowMetadata = TileRowMetadata( zoomLevel = rs.getInt("zoom_level"), tileColumn = rs.getInt("tile_column"), tileRow = rs.getInt("tile_row")) - options.withTileRowMetadata(tileRowMetadata) - case UNKNOWN => options + options.partitionOptions.withTileRowMetadata(tileRowMetadata) + case UNKNOWN => options.partitionOptions } } @@ -72,5 +97,6 @@ case class GeoPackagePartitionReader( override def close(): Unit = { rs.close() + options.tempFile.delete() } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala index af61740eb9..718c813c0b 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala @@ -18,29 +18,70 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager -import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, PartitionOptions, TableType} +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} +import org.apache.sedona.sql.datasources.geopackage.model.TableType.TILES +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, GeoPackageReadOptions, PartitionOptions, TableType} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration -class GeoPackagePartitionReaderFactory(loadOptions: GeoPackageLoadOptions) - extends PartitionReaderFactory { +case class GeoPackagePartitionReaderFactory( + sparkSession: SparkSession, + broadcastedConf: Broadcast[SerializableConfiguration], + loadOptions: GeoPackageOptions, + dataSchema: StructType) + extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - if (loadOptions.showMetadata) { - return new GeoPackagePartitionReader(PartitionOptions.fromLoadOptions(loadOptions)) + val partitionFiles = partition match { + case filePartition: FilePartition => filePartition.files + case _ => + throw new IllegalArgumentException( + s"Unexpected partition type: ${partition.getClass.getCanonicalName}") } - loadOptions.tableType match { - case TableType.FEATURES => - new GeoPackagePartitionReader(PartitionOptions.fromLoadOptions(loadOptions)) + val tempFile = FileSystemUtils.copyToLocal( + options = broadcastedConf.value.value, + file = new Path(partitionFiles.head.filePath)) - case TableType.TILES => - val tileMetadata = - GeoPackageConnectionManager.findTilesMetadata(loadOptions.path, loadOptions.tableName) + val tableType = if (loadOptions.showMetadata) { + TableType.METADATA + } else { + GeoPackageConnectionManager.findFeatureMetadata(tempFile.getPath, loadOptions.tableName) + } + + val rs = + GeoPackageConnectionManager.getTableCursor(tempFile.getAbsolutePath, loadOptions.tableName) + + val schema = GeoPackageConnectionManager.getSchema(tempFile.getPath, loadOptions.tableName) - new GeoPackagePartitionReader( - PartitionOptions.fromLoadOptions(loadOptions).withTile(tileMetadata)) + if (StructType(schema.map(_.toStructField(tableType))) != dataSchema) { + throw new IllegalArgumentException( + s"Schema mismatch: expected $dataSchema, got ${StructType(schema.map(_.toStructField(tableType)))}") } + + val tileMetadata = tableType match { + case TILES => + Some( + GeoPackageConnectionManager.findTilesMetadata(tempFile.getPath, loadOptions.tableName)) + case _ => None + } + + GeoPackagePartitionReader( + rs = rs, + options = GeoPackageReadOptions( + tableName = loadOptions.tableName, + tempFile = tempFile, + partitionOptions = + PartitionOptions(tableType = tableType, columns = schema, tile = tileMetadata), + partitionedFiles = scala.collection.mutable.HashSet(partitionFiles: _*), + currentFile = partitionFiles.head), + broadcastedConf = broadcastedConf, + currentTempFile = tempFile) } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala index 6a6da90ecf..854467ba9f 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala @@ -18,22 +18,27 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions +import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.jdk.CollectionConverters.mapAsScalaMapConverter case class GeoPackageScan( - dataSchema: StructType, - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - readDataSchema: StructType, - readPartitionSchema: StructType, - loadOptions: GeoPackageLoadOptions) - extends FileScan { + dataSchema: StructType, + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + loadOptions: GeoPackageOptions) + extends FileScan { override def partitionFilters: Seq[Expression] = { Seq.empty @@ -44,6 +49,11 @@ case class GeoPackageScan( } override def createReaderFactory(): PartitionReaderFactory = { - new GeoPackagePartitionReaderFactory(loadOptions) + val caseSensitiveMap = options.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val broadcastedConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema) } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala index 88f50535b6..d1c011611c 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala @@ -18,19 +18,21 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, GeoPackageOptions} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap class GeoPackageScanBuilder( - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - dataSchema: StructType, - loadOptions: GeoPackageLoadOptions) - extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + options: CaseInsensitiveStringMap, + loadOptions: GeoPackageOptions) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { override def build(): Scan = { GeoPackageScan( @@ -39,6 +41,7 @@ class GeoPackageScanBuilder( fileIndex, dataSchema, readPartitionSchema(), + options, loadOptions) } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index b7e101e4a8..a7e2d55d43 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -19,7 +19,8 @@ package org.apache.sedona.sql.datasources.geopackage import org.apache.hadoop.fs.FileStatus -import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions +import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, TableType} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} @@ -27,19 +28,39 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.jdk.CollectionConverters.mapAsScalaMapConverter case class GeoPackageTable( - name: String, - sparkSession: SparkSession, - options: CaseInsensitiveStringMap, - paths: Seq[String], - userSpecifiedSchema: Option[StructType], - fallbackFileFormat: Class[_ <: FileFormat], - loadOptions: GeoPackageLoadOptions) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat], + loadOptions: GeoPackageOptions) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { - Some(getSchema) + val serializableConf = new SerializableConfiguration( + sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) + + val tempFile = FileSystemUtils.copyToLocal(serializableConf.value, files.head.getPath) + + tempFile.deleteOnExit() + + val tableType = if (loadOptions.showMetadata) { + TableType.METADATA + } else { + GeoPackageConnectionManager.findFeatureMetadata(tempFile.getPath, loadOptions.tableName) + } + + Some( + StructType( + GeoPackageConnectionManager + .getSchema(tempFile.getPath, loadOptions.tableName) + .map(field => field.toStructField(tableType)))) } override def formatName: String = { @@ -47,19 +68,10 @@ case class GeoPackageTable( } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - val schema = userSpecifiedSchema.getOrElse(getSchema) - - new GeoPackageScanBuilder(sparkSession, fileIndex, schema, loadOptions) + new GeoPackageScanBuilder(sparkSession, fileIndex, schema, options, loadOptions) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { null } - - private def getSchema: StructType = { - val fields = loadOptions.fields - .map(field => field.toStructField(loadOptions.tableType)) - - StructType(fields) - } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala index 5e3f14694d..7721c38909 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageDataSource.scala @@ -18,15 +18,18 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager -import org.apache.sedona.sql.datasources.geopackage.model.TableType.TableType -import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, TableType} +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.util.CaseInsensitiveStringMap +import java.util.Locale +import scala.jdk.CollectionConverters.mapAsScalaMapConverter +import scala.util.Try + class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister { override def fallbackFileFormat: Class[_ <: FileFormat] = { @@ -34,19 +37,17 @@ class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister { } override protected def getTable(options: CaseInsensitiveStringMap): Table = { - val loadOptions = getLoadOptions(options) - GeoPackageTable( "", sparkSession, options, - Seq(loadOptions.path), + getTransformedPath(options), None, fallbackFileFormat, - loadOptions) + getLoadOptions(options)) } - private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageLoadOptions = { + private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageOptions = { val path = options.get("path") if (path.isEmpty) { throw new IllegalArgumentException("GeoPackage path is not specified") @@ -65,22 +66,33 @@ class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister { maybeTableName } - val fields = GeoPackageConnectionManager - .getSchema(path, tableName) + GeoPackageOptions(tableName = tableName, showMetadata = showMetadata) + } - GeoPackageLoadOptions( - path = path, - showMetadata = showMetadata, - tableName = tableName, - tableType = getTableType(showMetadata = showMetadata, path = path, tableName = tableName), - fields = fields) + private def getTransformedPath(options: CaseInsensitiveStringMap): Seq[String] = { + val paths = getPaths(options) + transformPaths(paths, options) } - private def getTableType(showMetadata: Boolean, path: String, tableName: String): TableType = { - if (showMetadata) { - TableType.METADATA - } else { - GeoPackageConnectionManager.findFeatureMetadata(path, tableName) + private def transformPaths( + paths: Seq[String], + options: CaseInsensitiveStringMap): Seq[String] = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + paths.map { pathString => + if (pathString.toLowerCase(Locale.ROOT).endsWith(".geopackage")) { + val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) + + val isDirectory = Try(fs.getFileStatus(path).isDirectory).getOrElse(false) + if (isDirectory) { + pathString + } else { + pathString.substring(0, pathString.length - 3) + "???" + } + } else { + pathString + } } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala index 630162e5e1..fa468204ed 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReader.scala @@ -18,50 +18,75 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} import org.apache.sedona.sql.datasources.geopackage.model.TableType.{FEATURES, METADATA, TILES, UNKNOWN} -import org.apache.sedona.sql.datasources.geopackage.model.{PartitionOptions, TileRowMetadata} +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageReadOptions, PartitionOptions, TileRowMetadata} import org.apache.sedona.sql.datasources.geopackage.transform.ValuesMapper +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.util.SerializableConfiguration +import java.io.File import java.sql.ResultSet case class GeoPackagePartitionReader( - var values: Seq[Any], - rs: ResultSet, - options: PartitionOptions) - extends PartitionReader[InternalRow] { - - def this(partitionOptions: PartitionOptions) = { - this( - Seq.empty, - GeoPackageConnectionManager.getTableCursor(partitionOptions), - partitionOptions) - } + var rs: ResultSet, + options: GeoPackageReadOptions, + broadcastedConf: Broadcast[SerializableConfiguration], + var currentTempFile: File) + extends PartitionReader[InternalRow] { + + private var values: Seq[Any] = Seq.empty + private var currentFile = options.currentFile + private val partitionedFiles = options.partitionedFiles override def next(): Boolean = { - val hasNext = rs.next() - if (!hasNext) { + if (rs.next()) { + values = ValuesMapper.mapValues(adjustPartitionOptions, rs) + return true + } + + partitionedFiles.remove(currentFile) + + if (partitionedFiles.isEmpty) { + return false + } + + rs.close() + + currentFile = partitionedFiles.head + val tempFile = FileSystemUtils.copyToLocal( + options = broadcastedConf.value.value, + file = new Path(currentFile.filePath)) + + currentTempFile.deleteOnExit() + + currentTempFile = tempFile + + rs = GeoPackageConnectionManager.getTableCursor(currentTempFile.getPath, options.tableName) + + if (!rs.next()) { return false } values = ValuesMapper.mapValues(adjustPartitionOptions, rs) - hasNext + true } private def adjustPartitionOptions: PartitionOptions = { - options.loadOptions.tableType match { - case FEATURES | METADATA => options + options.partitionOptions.tableType match { + case FEATURES | METADATA => options.partitionOptions case TILES => val tileRowMetadata = TileRowMetadata( zoomLevel = rs.getInt("zoom_level"), tileColumn = rs.getInt("tile_column"), tileRow = rs.getInt("tile_row")) - options.withTileRowMetadata(tileRowMetadata) - case UNKNOWN => options + options.partitionOptions.withTileRowMetadata(tileRowMetadata) + case UNKNOWN => options.partitionOptions } } @@ -72,5 +97,6 @@ case class GeoPackagePartitionReader( override def close(): Unit = { rs.close() + options.tempFile.delete() } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala index af61740eb9..718c813c0b 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala @@ -18,29 +18,70 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.connection.GeoPackageConnectionManager -import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, PartitionOptions, TableType} +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} +import org.apache.sedona.sql.datasources.geopackage.model.TableType.TILES +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, GeoPackageReadOptions, PartitionOptions, TableType} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration -class GeoPackagePartitionReaderFactory(loadOptions: GeoPackageLoadOptions) - extends PartitionReaderFactory { +case class GeoPackagePartitionReaderFactory( + sparkSession: SparkSession, + broadcastedConf: Broadcast[SerializableConfiguration], + loadOptions: GeoPackageOptions, + dataSchema: StructType) + extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - if (loadOptions.showMetadata) { - return new GeoPackagePartitionReader(PartitionOptions.fromLoadOptions(loadOptions)) + val partitionFiles = partition match { + case filePartition: FilePartition => filePartition.files + case _ => + throw new IllegalArgumentException( + s"Unexpected partition type: ${partition.getClass.getCanonicalName}") } - loadOptions.tableType match { - case TableType.FEATURES => - new GeoPackagePartitionReader(PartitionOptions.fromLoadOptions(loadOptions)) + val tempFile = FileSystemUtils.copyToLocal( + options = broadcastedConf.value.value, + file = new Path(partitionFiles.head.filePath)) - case TableType.TILES => - val tileMetadata = - GeoPackageConnectionManager.findTilesMetadata(loadOptions.path, loadOptions.tableName) + val tableType = if (loadOptions.showMetadata) { + TableType.METADATA + } else { + GeoPackageConnectionManager.findFeatureMetadata(tempFile.getPath, loadOptions.tableName) + } + + val rs = + GeoPackageConnectionManager.getTableCursor(tempFile.getAbsolutePath, loadOptions.tableName) + + val schema = GeoPackageConnectionManager.getSchema(tempFile.getPath, loadOptions.tableName) - new GeoPackagePartitionReader( - PartitionOptions.fromLoadOptions(loadOptions).withTile(tileMetadata)) + if (StructType(schema.map(_.toStructField(tableType))) != dataSchema) { + throw new IllegalArgumentException( + s"Schema mismatch: expected $dataSchema, got ${StructType(schema.map(_.toStructField(tableType)))}") } + + val tileMetadata = tableType match { + case TILES => + Some( + GeoPackageConnectionManager.findTilesMetadata(tempFile.getPath, loadOptions.tableName)) + case _ => None + } + + GeoPackagePartitionReader( + rs = rs, + options = GeoPackageReadOptions( + tableName = loadOptions.tableName, + tempFile = tempFile, + partitionOptions = + PartitionOptions(tableType = tableType, columns = schema, tile = tileMetadata), + partitionedFiles = scala.collection.mutable.HashSet(partitionFiles: _*), + currentFile = partitionFiles.head), + broadcastedConf = broadcastedConf, + currentTempFile = tempFile) } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala index 6a6da90ecf..854467ba9f 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala @@ -18,22 +18,27 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions +import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.jdk.CollectionConverters.mapAsScalaMapConverter case class GeoPackageScan( - dataSchema: StructType, - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - readDataSchema: StructType, - readPartitionSchema: StructType, - loadOptions: GeoPackageLoadOptions) - extends FileScan { + dataSchema: StructType, + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + loadOptions: GeoPackageOptions) + extends FileScan { override def partitionFilters: Seq[Expression] = { Seq.empty @@ -44,6 +49,11 @@ case class GeoPackageScan( } override def createReaderFactory(): PartitionReaderFactory = { - new GeoPackagePartitionReaderFactory(loadOptions) + val caseSensitiveMap = options.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val broadcastedConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema) } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala index 88f50535b6..c9452a6535 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala @@ -18,19 +18,21 @@ */ package org.apache.sedona.sql.datasources.geopackage -import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions +import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap class GeoPackageScanBuilder( - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - dataSchema: StructType, - loadOptions: GeoPackageLoadOptions) - extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + options: CaseInsensitiveStringMap, + loadOptions: GeoPackageOptions) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { override def build(): Scan = { GeoPackageScan( @@ -39,6 +41,7 @@ class GeoPackageScanBuilder( fileIndex, dataSchema, readPartitionSchema(), + options, loadOptions) } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index b7e101e4a8..a7e2d55d43 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -19,7 +19,8 @@ package org.apache.sedona.sql.datasources.geopackage import org.apache.hadoop.fs.FileStatus -import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageLoadOptions +import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager} +import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, TableType} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} @@ -27,19 +28,39 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.jdk.CollectionConverters.mapAsScalaMapConverter case class GeoPackageTable( - name: String, - sparkSession: SparkSession, - options: CaseInsensitiveStringMap, - paths: Seq[String], - userSpecifiedSchema: Option[StructType], - fallbackFileFormat: Class[_ <: FileFormat], - loadOptions: GeoPackageLoadOptions) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat], + loadOptions: GeoPackageOptions) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { - Some(getSchema) + val serializableConf = new SerializableConfiguration( + sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) + + val tempFile = FileSystemUtils.copyToLocal(serializableConf.value, files.head.getPath) + + tempFile.deleteOnExit() + + val tableType = if (loadOptions.showMetadata) { + TableType.METADATA + } else { + GeoPackageConnectionManager.findFeatureMetadata(tempFile.getPath, loadOptions.tableName) + } + + Some( + StructType( + GeoPackageConnectionManager + .getSchema(tempFile.getPath, loadOptions.tableName) + .map(field => field.toStructField(tableType)))) } override def formatName: String = { @@ -47,19 +68,10 @@ case class GeoPackageTable( } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - val schema = userSpecifiedSchema.getOrElse(getSchema) - - new GeoPackageScanBuilder(sparkSession, fileIndex, schema, loadOptions) + new GeoPackageScanBuilder(sparkSession, fileIndex, schema, options, loadOptions) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { null } - - private def getSchema: StructType = { - val fields = loadOptions.fields - .map(field => field.toStructField(loadOptions.tableType)) - - StructType(fields) - } }