Skip to content

Commit

Permalink
[FLINK-36704] Update TypeInference with StaticArgument and StateTypeS…
Browse files Browse the repository at this point in the history
…trategy
  • Loading branch information
twalthr committed Nov 18, 2024
1 parent 77e7ac0 commit 8118e4c
Show file tree
Hide file tree
Showing 10 changed files with 557 additions and 122 deletions.
26 changes: 13 additions & 13 deletions flink-table/flink-sql-client/src/test/resources/sql/function.q
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ describe function extended temp_upperudf;
| requirements | $VAR_UDF_JAR_PATH_SPACE [] |
| is deterministic | $VAR_UDF_JAR_PATH_SPACE true |
| supports constant folding | $VAR_UDF_JAR_PATH_SPACE true |
| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) |
| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(arg0 => STRING) |
+---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+
10 rows in set
!ok
Expand Down Expand Up @@ -437,7 +437,7 @@ desc function extended temp_upperudf;
| requirements | $VAR_UDF_JAR_PATH_SPACE [] |
| is deterministic | $VAR_UDF_JAR_PATH_SPACE true |
| supports constant folding | $VAR_UDF_JAR_PATH_SPACE true |
| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) |
| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(arg0 => STRING) |
+---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+
10 rows in set
!ok
Expand Down Expand Up @@ -489,16 +489,16 @@ describe function `c1`.`db`.temp_upperudf;
!ok

describe function extended temp_upperudf;
+---------------------------+-----------------------+
| info name | info value |
+---------------------------+-----------------------+
| is system function | true |
| is temporary | true |
| kind | SCALAR |
| requirements | [] |
| is deterministic | true |
| supports constant folding | true |
| signature | temp_upperudf(STRING) |
+---------------------------+-----------------------+
+---------------------------+-------------------------------+
| info name | info value |
+---------------------------+-------------------------------+
| is system function | true |
| is temporary | true |
| kind | SCALAR |
| requirements | [] |
| is deterministic | true |
| supports constant folding | true |
| signature | temp_upperudf(arg0 => STRING) |
+---------------------------+-------------------------------+
7 rows in set
!ok
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategy;

Expand Down Expand Up @@ -292,11 +293,18 @@ public Builder kind(FunctionKind kind) {
return this;
}

public Builder staticArguments(StaticArgument... staticArguments) {
this.typeInferenceBuilder.staticArguments(staticArguments);
return this;
}

/** @deprecated Use {@link #staticArguments(StaticArgument...)} instead. */
public Builder namedArguments(String... argumentNames) {
this.typeInferenceBuilder.namedArguments(Arrays.asList(argumentNames));
return this;
}

/** @deprecated Use {@link #staticArguments(StaticArgument...)} instead. */
public Builder typedArguments(DataType... argumentTypes) {
this.typeInferenceBuilder.typedArguments(Arrays.asList(argumentTypes));
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference;

/** Strategy for inferring a function call's intermediate result data type (i.e. state entry). */
public interface StateTypeStrategy extends TypeStrategy {
// marker interface which will be filled with additional contracts in the future
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

import java.util.Objects;
import java.util.Optional;

/** A helper class that wraps a {@link TypeStrategy} into a {@link StateTypeStrategy}. */
@PublicEvolving
public class StateTypeStrategyWrapper implements StateTypeStrategy {

private final TypeStrategy typeStrategy;

private StateTypeStrategyWrapper(TypeStrategy typeStrategy) {
this.typeStrategy =
Preconditions.checkNotNull(typeStrategy, "Type strategy must not be null.");
}

public static StateTypeStrategyWrapper of(TypeStrategy typeStrategy) {
return new StateTypeStrategyWrapper(typeStrategy);
}

@Override
public Optional<DataType> inferType(CallContext callContext) {
return typeStrategy.inferType(callContext);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof StateTypeStrategyWrapper) {
return Objects.equals(typeStrategy, ((StateTypeStrategyWrapper) o).typeStrategy);
}
if (o instanceof TypeStrategy) {
return Objects.equals(typeStrategy, o);
}
return false;
}

@Override
public int hashCode() {
return typeStrategy.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.EnumSet;
import java.util.Optional;

/**
* Describes an argument in a static signature that is not overloaded and does not support varargs.
*
* <p>Static arguments are a special case of an input type strategy. While built-in functions often
* require advanced type inference strategies (taking data type families, common type constraints
* between arguments, customized validation), many functions are fine with a static signature.
* Static arguments power these basic use cases.
*
* <p>Static arguments can take tables, models, or scalar values. Each argument takes a set of
* {@link StaticArgumentTrait} that enable basic validation by the framework.
*/
@PublicEvolving
public class StaticArgument {

private final String name;
private final @Nullable DataType dataType;
private final @Nullable Class<?> conversionClass;
private final boolean isOptional;
private final EnumSet<StaticArgumentTrait> traits;

private StaticArgument(
String name,
@Nullable DataType dataType,
@Nullable Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits) {
StaticArgumentTrait.checkIntegrity(
Preconditions.checkNotNull(traits, "Traits must not be null."));
this.name = Preconditions.checkNotNull(name, "Name must not be null.");
this.dataType = dataType;
this.conversionClass = conversionClass;
this.isOptional = isOptional;
this.traits = traits;
}

/**
* Declares a scalar argument such as {@code f(12)} or {@code f(otherColumn)}.
*
* @param name name for the assignment operator e.g. {@code f(myArg => 12)}
* @param dataType explicit type to which the argument is cast if necessary
* @param isOptional whether the argument is optional, if optional the corresponding data type
* must be nullable
*/
public static StaticArgument scalar(String name, DataType dataType, boolean isOptional) {
Preconditions.checkNotNull(dataType, "Data type must not be null.");
return new StaticArgument(
name, dataType, null, isOptional, EnumSet.of(StaticArgumentTrait.SCALAR));
}

/**
* Declares a table argument such as {@code f(t => myTable)} or {@code f(t => TABLE myTable))}.
*
* <p>The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW} (default) or {@link
* StaticArgumentTrait#TABLE_AS_SET} semantics.
*
* <p>By only providing a conversion class, the argument supports a "polymorphic" behavior. In
* other words: it accepts tables with an arbitrary number of columns with arbitrary data types.
* For this case, a class satisfying {@link RowType#supportsOutputConversion(Class)} must be
* used.
*
* @param name name for the assignment operator e.g. {@code f(myArg => 12)}
* @param conversionClass a class satisfying {@link RowType#supportsOutputConversion(Class)}
* @param isOptional whether the argument is optional
* @param traits set of {@link StaticArgumentTrait} requiring {@link StaticArgumentTrait#TABLE}
*/
public static StaticArgument table(
String name,
Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits) {
Preconditions.checkNotNull(conversionClass, "Conversion class must not be null.");
final EnumSet<StaticArgumentTrait> enrichedTraits = EnumSet.copyOf(traits);
enrichedTraits.add(StaticArgumentTrait.TABLE);
if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) {
enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW);
}
return new StaticArgument(name, null, conversionClass, isOptional, enrichedTraits);
}

/**
* Declares a table argument such as {@code f(t => myTable)} or {@code f(t => TABLE myTable))}.
*
* <p>The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW} (default) or {@link
* StaticArgumentTrait#TABLE_AS_SET} semantics.
*
* <p>By providing a concrete data type, the argument only accepts tables with corresponding
* number of columns and data types. The data type must be a {@link RowType} or {@link
* StructuredType}.
*
* @param name name for the assignment operator e.g. {@code f(myArg => 12)}
* @param dataType explicit type to which the argument is cast if necessary
* @param isOptional whether the argument is optional, if optional the corresponding data type
* must be nullable
* @param traits set of {@link StaticArgumentTrait} requiring {@link StaticArgumentTrait#TABLE}
*/
public static StaticArgument table(
String name,
DataType dataType,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits) {
Preconditions.checkNotNull(dataType, "Data type must not be null.");
return new StaticArgument(name, dataType, null, isOptional, enrichTableTraits(traits));
}

private static EnumSet<StaticArgumentTrait> enrichTableTraits(
EnumSet<StaticArgumentTrait> traits) {
final EnumSet<StaticArgumentTrait> enrichedTraits = EnumSet.copyOf(traits);
enrichedTraits.add(StaticArgumentTrait.TABLE);
if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) {
enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW);
}
return enrichedTraits;
}

public String getName() {
return name;
}

public Optional<DataType> getDataType() {
return Optional.ofNullable(dataType);
}

public Optional<Class<?>> getConversionClass() {
return Optional.ofNullable(conversionClass);
}

public boolean isOptional() {
return isOptional;
}

public EnumSet<StaticArgumentTrait> getTraits() {
return traits;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.ValidationException;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Declares traits for {@link StaticArgument}. They enable basic validation by the framework.
*
* <p>Some traits have dependencies to other traits, which is why this enum reflects a hierarchy in
* which {@link #SCALAR}, {@link #TABLE}, and {@link #MODEL} are the top-level roots.
*/
@PublicEvolving
public enum StaticArgumentTrait {
SCALAR(),
TABLE(),
MODEL(),
TABLE_AS_ROW(TABLE),
TABLE_AS_SET(TABLE),
OPTIONAL_PARTITION_BY(TABLE_AS_SET);

private final Set<StaticArgumentTrait> requirements;

StaticArgumentTrait(StaticArgumentTrait... requirements) {
this.requirements = Arrays.stream(requirements).collect(Collectors.toSet());
}

public static void checkIntegrity(EnumSet<StaticArgumentTrait> traits) {
if (traits.stream().filter(t -> t.requirements.isEmpty()).count() != 1) {
throw new ValidationException(
"Invalid argument traits. An argument must be declared as either scalar, table, or model.");
}
traits.forEach(
trait ->
trait.requirements.forEach(
requirement -> {
if (!traits.contains(requirement)) {
throw new ValidationException(
String.format(
"Invalid argument traits. Trait %s requires %s.",
trait, requirement));
}
}));
}
}
Loading

0 comments on commit 8118e4c

Please sign in to comment.