Skip to content

Commit 6bbf70a

Browse files
authored
Spark: Bypass Spark's ViewCatalog API when replacing a view (#9596)
Spark's `ViewCatalog` API doesn't have a `replace()` in 3.5 as it was only introduced later. Therefore we're bypassing Spark's `ViewCatalog` so that we can keep the view's history after executing a `CREATE OR REPLACE`
1 parent 3547a99 commit 6bbf70a

File tree

5 files changed

+172
-21
lines changed

5 files changed

+172
-21
lines changed

spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.sql.execution.datasources.v2
2121

22+
import org.apache.iceberg.spark.SupportsReplaceView
2223
import org.apache.spark.sql.catalyst.InternalRow
2324
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
2425
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -57,20 +58,34 @@ case class CreateV2ViewExec(
5758

5859
if (replace) {
5960
// CREATE OR REPLACE VIEW
60-
if (catalog.viewExists(ident)) {
61-
catalog.dropView(ident)
61+
catalog match {
62+
case c: SupportsReplaceView =>
63+
c.replaceView(
64+
ident,
65+
queryText,
66+
currentCatalog,
67+
currentNamespace,
68+
viewSchema,
69+
queryColumnNames.toArray,
70+
columnAliases.toArray,
71+
columnComments.map(c => c.orNull).toArray,
72+
newProperties.asJava)
73+
case _ =>
74+
if (catalog.viewExists(ident)) {
75+
catalog.dropView(ident)
76+
}
77+
78+
catalog.createView(
79+
ident,
80+
queryText,
81+
currentCatalog,
82+
currentNamespace,
83+
viewSchema,
84+
queryColumnNames.toArray,
85+
columnAliases.toArray,
86+
columnComments.map(c => c.orNull).toArray,
87+
newProperties.asJava)
6288
}
63-
// FIXME: replaceView API doesn't exist in Spark 3.5
64-
catalog.createView(
65-
ident,
66-
queryText,
67-
currentCatalog,
68-
currentNamespace,
69-
viewSchema,
70-
queryColumnNames.toArray,
71-
columnAliases.toArray,
72-
columnComments.map(c => c.orNull).toArray,
73-
newProperties.asJava)
7489
} else {
7590
try {
7691
// CREATE VIEW [IF NOT EXISTS]

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1549,6 +1549,45 @@ public void alterViewIsNotSupported() throws NoSuchTableException {
15491549
"ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE VIEW instead");
15501550
}
15511551

1552+
@Test
1553+
public void createOrReplaceViewKeepsViewHistory() {
1554+
String viewName = viewName("viewWithHistoryAfterReplace");
1555+
String sql = String.format("SELECT id, data FROM %s WHERE id <= 3", tableName);
1556+
String updatedSql = String.format("SELECT id FROM %s WHERE id > 3", tableName);
1557+
1558+
sql(
1559+
"CREATE VIEW %s (new_id COMMENT 'some ID', new_data COMMENT 'some data') AS %s",
1560+
viewName, sql);
1561+
1562+
View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
1563+
assertThat(view.history()).hasSize(1);
1564+
assertThat(view.sqlFor("spark").sql()).isEqualTo(sql);
1565+
assertThat(view.currentVersion().versionId()).isEqualTo(1);
1566+
assertThat(view.currentVersion().schemaId()).isEqualTo(0);
1567+
assertThat(view.schemas()).hasSize(1);
1568+
assertThat(view.schema().asStruct())
1569+
.isEqualTo(
1570+
new Schema(
1571+
Types.NestedField.optional(0, "new_id", Types.IntegerType.get(), "some ID"),
1572+
Types.NestedField.optional(1, "new_data", Types.StringType.get(), "some data"))
1573+
.asStruct());
1574+
1575+
sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s", viewName, updatedSql);
1576+
1577+
view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
1578+
assertThat(view.history()).hasSize(2);
1579+
assertThat(view.sqlFor("spark").sql()).isEqualTo(updatedSql);
1580+
assertThat(view.currentVersion().versionId()).isEqualTo(2);
1581+
assertThat(view.currentVersion().schemaId()).isEqualTo(1);
1582+
assertThat(view.schemas()).hasSize(2);
1583+
assertThat(view.schema().asStruct())
1584+
.isEqualTo(
1585+
new Schema(
1586+
Types.NestedField.optional(
1587+
0, "updated_id", Types.IntegerType.get(), "updated ID"))
1588+
.asStruct());
1589+
}
1590+
15521591
private void insertRows(int numRows) throws NoSuchTableException {
15531592
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
15541593
for (int i = 1; i <= numRows; i++) {

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Map;
2727
import java.util.Objects;
2828
import java.util.Set;
29-
import java.util.StringJoiner;
3029
import java.util.TreeMap;
3130
import java.util.concurrent.TimeUnit;
3231
import java.util.regex.Matcher;
@@ -52,6 +51,7 @@
5251
import org.apache.iceberg.exceptions.ValidationException;
5352
import org.apache.iceberg.hadoop.HadoopCatalog;
5453
import org.apache.iceberg.hadoop.HadoopTables;
54+
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
5555
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5656
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
5757
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -121,9 +121,10 @@
121121
* <p>
122122
*/
123123
public class SparkCatalog extends BaseCatalog
124-
implements org.apache.spark.sql.connector.catalog.ViewCatalog {
124+
implements org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView {
125125
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
126126
private static final Splitter COMMA = Splitter.on(",");
127+
private static final Joiner COMMA_JOINER = Joiner.on(",");
127128
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
128129
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
129130
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
@@ -578,15 +579,13 @@ public View createView(
578579
if (null != asViewCatalog) {
579580
Schema icebergSchema = SparkSchemaUtil.convert(schema);
580581

581-
StringJoiner joiner = new StringJoiner(",");
582-
Arrays.stream(queryColumnNames).forEach(joiner::add);
583-
584582
try {
585583
Map<String, String> props =
586584
ImmutableMap.<String, String>builder()
587585
.putAll(Spark3Util.rebuildCreateProperties(properties))
588-
.put("spark.query-column-names", joiner.toString())
589-
.build();
586+
.put(SparkView.QUERY_COLUMN_NAMES, COMMA_JOINER.join(queryColumnNames))
587+
.buildKeepingLast();
588+
590589
org.apache.iceberg.view.View view =
591590
asViewCatalog
592591
.buildView(buildIdentifier(ident))
@@ -609,6 +608,48 @@ public View createView(
609608
"Creating a view is not supported by catalog: " + catalogName);
610609
}
611610

611+
@Override
612+
public View replaceView(
613+
Identifier ident,
614+
String sql,
615+
String currentCatalog,
616+
String[] currentNamespace,
617+
StructType schema,
618+
String[] queryColumnNames,
619+
String[] columnAliases,
620+
String[] columnComments,
621+
Map<String, String> properties)
622+
throws NoSuchNamespaceException {
623+
if (null != asViewCatalog) {
624+
Schema icebergSchema = SparkSchemaUtil.convert(schema);
625+
626+
try {
627+
Map<String, String> props =
628+
ImmutableMap.<String, String>builder()
629+
.putAll(Spark3Util.rebuildCreateProperties(properties))
630+
.put(SparkView.QUERY_COLUMN_NAMES, COMMA_JOINER.join(queryColumnNames))
631+
.buildKeepingLast();
632+
633+
org.apache.iceberg.view.View view =
634+
asViewCatalog
635+
.buildView(buildIdentifier(ident))
636+
.withDefaultCatalog(currentCatalog)
637+
.withDefaultNamespace(Namespace.of(currentNamespace))
638+
.withQuery("spark", sql)
639+
.withSchema(icebergSchema)
640+
.withLocation(properties.get("location"))
641+
.withProperties(props)
642+
.replace();
643+
return new SparkView(catalogName, view);
644+
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
645+
throw new NoSuchNamespaceException(currentNamespace);
646+
}
647+
}
648+
649+
throw new UnsupportedOperationException(
650+
"Replacing a view is not supported by catalog: " + catalogName);
651+
}
652+
612653
@Override
613654
public View alterView(Identifier ident, ViewChange... changes)
614655
throws NoSuchViewException, IllegalArgumentException {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark;
20+
21+
import java.util.Map;
22+
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
23+
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
24+
import org.apache.spark.sql.connector.catalog.Identifier;
25+
import org.apache.spark.sql.connector.catalog.View;
26+
import org.apache.spark.sql.connector.catalog.ViewCatalog;
27+
import org.apache.spark.sql.types.StructType;
28+
29+
public interface SupportsReplaceView extends ViewCatalog {
30+
/**
31+
* Replace a view in the catalog
32+
*
33+
* @param ident a view identifier
34+
* @param sql the SQL text that defines the view
35+
* @param currentCatalog the current catalog
36+
* @param currentNamespace the current namespace
37+
* @param schema the view query output schema
38+
* @param queryColumnNames the query column names
39+
* @param columnAliases the column aliases
40+
* @param columnComments the column comments
41+
* @param properties the view properties
42+
* @throws NoSuchViewException If the view doesn't exist or is a table
43+
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
44+
*/
45+
View replaceView(
46+
Identifier ident,
47+
String sql,
48+
String currentCatalog,
49+
String[] currentNamespace,
50+
StructType schema,
51+
String[] queryColumnNames,
52+
String[] columnAliases,
53+
String[] columnComments,
54+
Map<String, String> properties)
55+
throws NoSuchViewException, NoSuchNamespaceException;
56+
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
public class SparkView implements org.apache.spark.sql.connector.catalog.View {
3737

38-
private static final String QUERY_COLUMN_NAMES = "spark.query-column-names";
38+
public static final String QUERY_COLUMN_NAMES = "spark.query-column-names";
3939
public static final Set<String> RESERVED_PROPERTIES =
4040
ImmutableSet.of("provider", "location", FORMAT_VERSION, QUERY_COLUMN_NAMES);
4141

0 commit comments

Comments
 (0)