You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This works as expected (i.e., table partitions included in the Dataframe are overwritten).
However, when we alter the table to enable the require_partition_filter option:
The same df.write call would fail complaining about the lack of partition filter in the query:
scala> .save()
com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:157)
at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
... 53 elided
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:116)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:745)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1500)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1495)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1494)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1478)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job$1.call(Job.java:390)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job$1.call(Job.java:387)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForQueryResults(Job.java:386)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForInternal(Job.java:281)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:202)
at com.google.cloud.bigquery.connector.common.BigQueryClient.waitForJob(BigQueryClient.java:131)
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:151)
... 77 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
GET https://bigquery.googleapis.com/bigquery/v2/projects/my-spark-bigquery-demo/queries/c80fb709-7aa0-4e1c-8fae-4e806bcfc83a?location=europe-west2&maxResults=0&prettyPrint=false
{
"code": 400,
"errors": [
{
"domain": "global",
"location": "q",
"locationType": "parameter",
"message": "Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]",
"reason": "invalidQuery"
}
],
"message": "Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]",
"status": "INVALID_ARGUMENT"
}
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$3.interceptResponse(AbstractGoogleClientRequest.java:479)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:565)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:506)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:616)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:743)
... 94 more
scala>
Tested with the following
Spark 3.1.3 on Dataproc
Scala 2.12
BigQuery Connector: spark-bigquery-with-dependencies_2.12-0.40.0.jar
The text was updated successfully, but these errors were encountered:
Looking at the Bigquery query logs, it looks like the connector is trying to execute the following code block:
DECLARE
partitions_to_delete DEFAULT (
SELECT
ARRAY_AGG(DISTINCT(TIMESTAMP_TRUNC(`txndate`, DAY)) IGNORE NULLS)
FROM`spark_test.mytable2316280012714`);
MERGE
`spark_test.mytable`AS TARGET
USING
`spark_test.mytable2316280012714`AS SOURCE
ON
FALSE
WHEN NOT MATCHED BY SOURCE AND TIMESTAMP_TRUNC(`target`.`txndate`, DAY) IN UNNEST(partitions_to_delete) THEN DELETE
WHEN NOT MATCHED BY TARGET
THEN
INSERT
(`id`,
`name`,
`txndate`)
VALUES
(`id`,`name`,`txndate`);
in which the partitioned column txndate is referenced with a call to the TIMESTAMP_TRUNC function. I believe this is what makes the BQ optimizer reject the query.
A potential workaround for date partitioned table would be to change the above block into something like this:
DECLARE
partitions_to_delete DEFAULT (
SELECT
ARRAY_AGG(DISTINCT(DATE(TIMESTAMP_TRUNC(`txndate`, DAY))) IGNORE NULLS)
FROM`spark_test.mytable2316280012714`);
MERGE
`spark_test.mytable`AS TARGET
USING
`spark_test.mytable2316280012714`AS SOURCE
ON
FALSE
WHEN NOT MATCHED BY SOURCE AND`target`.`txndate`IN UNNEST(partitions_to_delete) THEN DELETE
WHEN NOT MATCHED BY TARGET
THEN
INSERT
(`id`,
`name`,
`txndate`)
VALUES
(`id`,`name`,`txndate`);
Any chance this could be fixed in the next connector version?
A Bigquery table is created like this:
The following Scala spark code is used to populate the table with dummy data:
This works as expected (i.e., table partitions included in the Dataframe are overwritten).
However, when we alter the table to enable the
require_partition_filter
option:The same
df.write
call would fail complaining about the lack of partition filter in the query:Tested with the following
Spark 3.1.3 on Dataproc
Scala 2.12
BigQuery Connector: spark-bigquery-with-dependencies_2.12-0.40.0.jar
The text was updated successfully, but these errors were encountered: