Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to write to BigQuery table with require_partition_filter = true when spark.sql.sources.partitionOverwriteMode is set to DYNAMIC #1285

Open
ktchana opened this issue Sep 3, 2024 · 1 comment
Assignees

Comments

@ktchana
Copy link

ktchana commented Sep 3, 2024

A Bigquery table is created like this:

CREATE TABLE `spark_test.mytable`
(
  id INT64 NOT NULL,
  name STRING,
  txndate DATE NOT NULL
)
PARTITION BY txndate;

The following Scala spark code is used to populate the table with dummy data:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import java.sql.Date

val data = Seq(
  Row(1, "Alice", Date.valueOf("2023-03-01")),
  Row(2, "Bob", Date.valueOf("2023-03-02")),
  Row(3, "Charlie", Date.valueOf("2023-03-03"))
)

val schema = StructType(List(
  StructField("id", IntegerType, nullable = false),
  StructField("name", StringType),
  StructField("txndate", DateType, nullable = false)
))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

df.write
  .format("bigquery")
  .option("table", "spark_test.mytable")
  .option("temporaryGcsBucket", "my-temp-gcs-123")
  .option("writeMethod", "indirect")
  .option("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
  .mode("overwrite")
  .save()

This works as expected (i.e., table partitions included in the Dataframe are overwritten).
However, when we alter the table to enable the require_partition_filter option:

ALTER TABLE spark_test.mytable
  SET OPTIONS (
    require_partition_filter = true);

The same df.write call would fail complaining about the lack of partition filter in the query:

scala>   .save()
com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:157)
  at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
  at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
  at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
  ... 53 elided
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:116)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:745)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1500)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1495)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1494)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1478)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job$1.call(Job.java:390)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job$1.call(Job.java:387)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForQueryResults(Job.java:386)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForInternal(Job.java:281)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:202)
  at com.google.cloud.bigquery.connector.common.BigQueryClient.waitForJob(BigQueryClient.java:131)
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:151)
  ... 77 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
GET https://bigquery.googleapis.com/bigquery/v2/projects/my-spark-bigquery-demo/queries/c80fb709-7aa0-4e1c-8fae-4e806bcfc83a?location=europe-west2&maxResults=0&prettyPrint=false
{
  "code": 400,
  "errors": [
    {
      "domain": "global",
      "location": "q",
      "locationType": "parameter",
      "message": "Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]",
      "reason": "invalidQuery"
    }
  ],
  "message": "Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]",
  "status": "INVALID_ARGUMENT"
}
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$3.interceptResponse(AbstractGoogleClientRequest.java:479)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:565)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:506)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:616)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:743)
  ... 94 more

scala> 

Tested with the following

Spark 3.1.3 on Dataproc
Scala 2.12
BigQuery Connector: spark-bigquery-with-dependencies_2.12-0.40.0.jar

@ktchana
Copy link
Author

ktchana commented Sep 3, 2024

Looking at the Bigquery query logs, it looks like the connector is trying to execute the following code block:

DECLARE
  partitions_to_delete DEFAULT (
  SELECT
    ARRAY_AGG(DISTINCT(TIMESTAMP_TRUNC(`txndate`, DAY)) IGNORE NULLS)
  FROM
    `spark_test.mytable2316280012714`);

MERGE
  `spark_test.mytable` AS TARGET
USING
  `spark_test.mytable2316280012714` AS SOURCE
ON
  FALSE
  WHEN NOT MATCHED BY SOURCE AND TIMESTAMP_TRUNC(`target`.`txndate`, DAY) IN UNNEST(partitions_to_delete) THEN DELETE
  WHEN NOT MATCHED BY TARGET
  THEN
INSERT
  (`id`,
    `name`,
    `txndate`)
VALUES
  (`id`,`name`,`txndate`);

in which the partitioned column txndate is referenced with a call to the TIMESTAMP_TRUNC function. I believe this is what makes the BQ optimizer reject the query.

A potential workaround for date partitioned table would be to change the above block into something like this:

DECLARE
  partitions_to_delete DEFAULT (
  SELECT
    ARRAY_AGG(DISTINCT(DATE(TIMESTAMP_TRUNC(`txndate`, DAY))) IGNORE NULLS)
  FROM
    `spark_test.mytable2316280012714`);

MERGE
  `spark_test.mytable` AS TARGET
USING
  `spark_test.mytable2316280012714` AS SOURCE
ON
  FALSE
  WHEN NOT MATCHED BY SOURCE AND `target`.`txndate` IN UNNEST(partitions_to_delete) THEN DELETE
  WHEN NOT MATCHED BY TARGET
  THEN
INSERT
  (`id`,
    `name`,
    `txndate`)
VALUES
  (`id`,`name`,`txndate`);

Any chance this could be fixed in the next connector version?

@isha97 isha97 added enhancement New feature or request and removed enhancement New feature or request labels Oct 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants