diff --git a/docs/api/sql/Optimizer.md b/docs/api/sql/Optimizer.md index 3fa0242bb0..1fe54ef742 100644 --- a/docs/api/sql/Optimizer.md +++ b/docs/api/sql/Optimizer.md @@ -46,7 +46,7 @@ RangeJoin polygonshape#20: geometry, pointshape#43: geometry, false ## Distance join -Introduction: Find geometries from A and geometries from B such that the distance of each geometry pair is less or equal than a certain distance. It supports the planar Euclidean distance calculator `ST_Distance` and the meter-based geodesic distance calculators `ST_DistanceSpheroid` and `ST_DistanceSphere`. +Introduction: Find geometries from A and geometries from B such that the distance of each geometry pair is less or equal than a certain distance. It supports the planar Euclidean distance calculators `ST_Distance` and `ST_HausdorffDistance` and the meter-based geodesic distance calculators `ST_DistanceSpheroid` and `ST_DistanceSphere`. Spark SQL Example for planar Euclidean distance: @@ -57,6 +57,12 @@ FROM pointdf1, pointdf2 WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2 ``` +```sql +SELECT * +FROM pointDf, polygonDF +WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2 +``` + *Consider ==intersects within a certain distance==* ```sql SELECT * @@ -64,6 +70,12 @@ FROM pointdf1, pointdf2 WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) <= 2 ``` +```sql +SELECT * +FROM pointDf, polygonDF +WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2 +``` + Spark SQL Physical plan: ``` == Physical Plan == @@ -75,7 +87,7 @@ DistanceJoin pointshape1#12: geometry, pointshape2#33: geometry, 2.0, true ``` !!!warning - If you use `ST_Distance` as the predicate, Sedona doesn't control the distance's unit (degree or meter). It is same with the geometry. If your coordinates are in the longitude and latitude system, the unit of `distance` should be degree instead of meter or mile. To change the geometry's unit, please either transform the coordinate reference system to a meter-based system. See [ST_Transform](Function.md#st_transform). If you don't want to transform your data, please consider using `ST_DistanceSpheroid` or `ST_DistanceSphere`. + If you use planar euclidean distance functions like `ST_Distance` or `ST_HausdorffDistance` as the predicate, Sedona doesn't control the distance's unit (degree or meter). It is same with the geometry. If your coordinates are in the longitude and latitude system, the unit of `distance` should be degree instead of meter or mile. To change the geometry's unit, please either transform the coordinate reference system to a meter-based system. See [ST_Transform](Function.md#st_transform). If you don't want to transform your data, please consider using `ST_DistanceSpheroid` or `ST_DistanceSphere`. Spark SQL Example for meter-based geodesic distance `ST_DistanceSpheroid` (works for `ST_DistanceSphere` too): @@ -126,7 +138,7 @@ BroadcastIndexJoin pointshape#52: geometry, BuildRight, BuildRight, false ST_Con +- FileScan csv ``` -This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid` or `ST_DistanceSphere`: +This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`, `ST_DistanceSphere` or `ST_HausdorffDistance`: ```scala pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"), expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2")) diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala index c661fc92a4..df2d615515 100644 --- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala +++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala @@ -1036,7 +1036,7 @@ case class ST_BoundingDiagonal(inputExpressions: Seq[Expression]) } case class ST_HausdorffDistance(inputExpressions: Seq[Expression]) - extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance)) { + extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance), inferrableFunction2(Functions.hausdorffDistance)) { protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = { copy(inputExpressions = newChildren) } @@ -1062,4 +1062,3 @@ case class ST_Degrees(inputExpressions: Seq[Expression]) copy(inputExpressions = newChildren) } } - diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala index 8a8f411b0d..d6d4a2bccc 100644 --- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala +++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala @@ -149,6 +149,32 @@ class JoinQueryDetector(sparkSession: SparkSession) extends Strategy { Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, true, condition, Some(distance))) case Some(And(_, LessThan(ST_DistanceSpheroid(Seq(leftShape, rightShape)), distance))) => Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, true, condition, Some(distance))) + //ST_HausdorffDistanceDefault + case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance), _)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance))) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance), _)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance))) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + //ST_HausdorffDistanceDensityFrac + case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance), _)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance))) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance), _)) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) + case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance))) => + Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance))) case _ => None } diff --git a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala index 6f5691280e..d2232278fd 100644 --- a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala +++ b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala @@ -350,6 +350,43 @@ class BroadcastIndexJoinSuite extends TestBaseScala { assert(distanceJoinDf.count() == expected) }) } + + it("Passed ST_HausdorffDistance with densityFrac <= distance in a broadcast join") { + val sampleCount = 100 + val distance = 1.0 + val densityFrac = 0.5 + val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3) + val pointDf = buildPointDf.limit(sampleCount).repartition(5) + val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.5, true) + + var distanceJoinDF = pointDf.alias("pointDf").join( + broadcast(polygonDf).alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, $densityFrac) <= $distance")) + assert(distanceJoinDF.queryExecution.sparkPlan.collect{case p: BroadcastIndexJoinExec => p}.size == 1) + assert(distanceJoinDF.count() == expected) + + distanceJoinDF = broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, $densityFrac) <= $distance")) + + assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => p }.size == 1) + assert(distanceJoinDF.count() == expected) + } + + it("Passed ST_HausdorffDistance <= distance in a broadcast join") { + val sampleCount = 200 + val distance = 2.0 + val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3) + val pointDf = buildPointDf.limit(sampleCount).repartition(5) + val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0, true) + + var distanceJoinDF = pointDf.alias("pointDf").join( + broadcast(polygonDf).alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= $distance")) + assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => p }.size == 1) + assert(distanceJoinDF.count() == expected) + + distanceJoinDF = broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= $distance")) + + assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => p }.size == 1) + assert(distanceJoinDF.count() == expected) + } } describe("Sedona-SQL Broadcast Index Join Test for left semi joins") { diff --git a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala index 21f31a3b0f..dc6936ddda 100644 --- a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala +++ b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala @@ -21,6 +21,7 @@ package org.apache.sedona.sql import com.google.common.math.DoubleMath import org.apache.log4j.{Level, Logger} import org.apache.sedona.common.sphere.{Haversine, Spheroid} +import org.apache.sedona.common.Functions.hausdorffDistance import org.apache.sedona.spark.SedonaContext import org.apache.spark.sql.DataFrame import org.locationtech.jts.geom.{CoordinateSequence, CoordinateSequenceComparator} @@ -117,4 +118,25 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { }).sum } + protected def bruteForceDistanceJoinHausdorff(sampleCount: Int, distance: Double, densityFrac: Double, intersects: Boolean): Int = { + val inputPolygon = buildPolygonDf.limit(sampleCount).collect() + val inputPoint = buildPointDf.limit(sampleCount).collect() + inputPoint.map(row => { + val point = row.getAs[org.locationtech.jts.geom.Point](0) + inputPolygon.map(row => { + val polygon = row.getAs[org.locationtech.jts.geom.Polygon](0) + if (densityFrac == 0) { + if (intersects) + if (hausdorffDistance(point, polygon) <= distance) 1 else 0 + else + if (hausdorffDistance(point, polygon) < distance) 1 else 0 + } else { + if (intersects) + if (hausdorffDistance(point, polygon, densityFrac) <= distance) 1 else 0 + else + if (hausdorffDistance(point, polygon, densityFrac) < distance) 1 else 0 + } + }).sum + }).sum + } } diff --git a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala index 66ee94caad..efa7280b65 100644 --- a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala +++ b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala @@ -403,7 +403,41 @@ class predicateJoinTestScala extends TestBaseScala { assert(distanceJoinDf.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1) assert(distanceJoinDf.count() == expected) }) + } + + it("Passed ST_HausdorffDistance in a spatial join") { + val sampleCount = 100 + val distanceCandidates = Seq(1, 2, 5, 10) + val densityFrac = 0.6 + val inputPoint = buildPointDf.limit(sampleCount).repartition(5) + val inputPolygon = buildPolygonDf.limit(sampleCount).repartition(3) + distanceCandidates.foreach(distance => { + + //DensityFrac specified, <= distance + val expectedDensityIntersects = bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, true) + val distanceDensityIntersectsDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) <= $distance")) + assert(distanceDensityIntersectsDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1) + assert(distanceDensityIntersectsDF.count() == expectedDensityIntersects) + + //DensityFrac specified, < distance + val expectedDensityNoIntersect = bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, false) + val distanceDensityNoIntersectDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) < $distance")) + assert(distanceDensityNoIntersectDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1) + assert(distanceDensityNoIntersectDF.count() == expectedDensityNoIntersect) + + //DensityFrac not specified, <= distance + val expectedDefaultIntersects = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, true) + val distanceDefaultIntersectsDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) <= $distance")) + assert(distanceDefaultIntersectsDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1) + assert(distanceDefaultIntersectsDF.count() == expectedDefaultIntersects) + + //DensityFrac not specified, < distance + val expectedDefaultNoIntersects = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, false) + val distanceDefaultNoIntersectsDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) < $distance")) + assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1) + assert(distanceDefaultIntersectsDF.count() == expectedDefaultNoIntersects) + }) } } }