From 8118e4cfe0decee0555def03cb3b98c6d55f8cf3 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 18 Nov 2024 12:19:53 +0100 Subject: [PATCH] [FLINK-36704] Update TypeInference with StaticArgument and StateTypeStrategy --- .../src/test/resources/sql/function.q | 26 +- .../functions/BuiltInFunctionDefinition.java | 8 + .../types/inference/StateTypeStrategy.java | 24 ++ .../inference/StateTypeStrategyWrapper.java | 66 ++++ .../table/types/inference/StaticArgument.java | 165 ++++++++++ .../types/inference/StaticArgumentTrait.java | 67 ++++ .../table/types/inference/TypeInference.java | 302 ++++++++++++------ .../table/types/inference/TypeStrategy.java | 3 +- .../TypeInferenceExtractorTest.java | 16 +- .../inference/InputTypeStrategiesTest.java | 2 +- 10 files changed, 557 insertions(+), 122 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q index 270ad9ff1f137..4cc0c043dc28c 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/function.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q @@ -406,7 +406,7 @@ describe function extended temp_upperudf; | requirements | $VAR_UDF_JAR_PATH_SPACE [] | | is deterministic | $VAR_UDF_JAR_PATH_SPACE true | | supports constant folding | $VAR_UDF_JAR_PATH_SPACE true | -| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) | +| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(arg0 => STRING) | +---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+ 10 rows in set !ok @@ -437,7 +437,7 @@ desc function extended temp_upperudf; | requirements | $VAR_UDF_JAR_PATH_SPACE [] | | is deterministic | $VAR_UDF_JAR_PATH_SPACE true | | supports constant folding | $VAR_UDF_JAR_PATH_SPACE true | -| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) | +| signature | $VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(arg0 => STRING) | +---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+ 10 rows in set !ok @@ -489,16 +489,16 @@ describe function `c1`.`db`.temp_upperudf; !ok describe function extended temp_upperudf; -+---------------------------+-----------------------+ -| info name | info value | -+---------------------------+-----------------------+ -| is system function | true | -| is temporary | true | -| kind | SCALAR | -| requirements | [] | -| is deterministic | true | -| supports constant folding | true | -| signature | temp_upperudf(STRING) | -+---------------------------+-----------------------+ ++---------------------------+-------------------------------+ +| info name | info value | ++---------------------------+-------------------------------+ +| is system function | true | +| is temporary | true | +| kind | SCALAR | +| requirements | [] | +| is deterministic | true | +| supports constant folding | true | +| signature | temp_upperudf(arg0 => STRING) | ++---------------------------+-------------------------------+ 7 rows in set !ok diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java index fce8c4664fa74..aaa657435c5b7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java @@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.StaticArgument; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeStrategy; @@ -292,11 +293,18 @@ public Builder kind(FunctionKind kind) { return this; } + public Builder staticArguments(StaticArgument... staticArguments) { + this.typeInferenceBuilder.staticArguments(staticArguments); + return this; + } + + /** @deprecated Use {@link #staticArguments(StaticArgument...)} instead. */ public Builder namedArguments(String... argumentNames) { this.typeInferenceBuilder.namedArguments(Arrays.asList(argumentNames)); return this; } + /** @deprecated Use {@link #staticArguments(StaticArgument...)} instead. */ public Builder typedArguments(DataType... argumentTypes) { this.typeInferenceBuilder.typedArguments(Arrays.asList(argumentTypes)); return this; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java new file mode 100644 index 0000000000000..cf92b2dad93d1 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +/** Strategy for inferring a function call's intermediate result data type (i.e. state entry). */ +public interface StateTypeStrategy extends TypeStrategy { + // marker interface which will be filled with additional contracts in the future +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java new file mode 100644 index 0000000000000..10d6b3b9f7c46 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; +import java.util.Optional; + +/** A helper class that wraps a {@link TypeStrategy} into a {@link StateTypeStrategy}. */ +@PublicEvolving +public class StateTypeStrategyWrapper implements StateTypeStrategy { + + private final TypeStrategy typeStrategy; + + private StateTypeStrategyWrapper(TypeStrategy typeStrategy) { + this.typeStrategy = + Preconditions.checkNotNull(typeStrategy, "Type strategy must not be null."); + } + + public static StateTypeStrategyWrapper of(TypeStrategy typeStrategy) { + return new StateTypeStrategyWrapper(typeStrategy); + } + + @Override + public Optional inferType(CallContext callContext) { + return typeStrategy.inferType(callContext); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof StateTypeStrategyWrapper) { + return Objects.equals(typeStrategy, ((StateTypeStrategyWrapper) o).typeStrategy); + } + if (o instanceof TypeStrategy) { + return Objects.equals(typeStrategy, o); + } + return false; + } + + @Override + public int hashCode() { + return typeStrategy.hashCode(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java new file mode 100644 index 0000000000000..ad791585c7806 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.EnumSet; +import java.util.Optional; + +/** + * Describes an argument in a static signature that is not overloaded and does not support varargs. + * + *

Static arguments are a special case of an input type strategy. While built-in functions often + * require advanced type inference strategies (taking data type families, common type constraints + * between arguments, customized validation), many functions are fine with a static signature. + * Static arguments power these basic use cases. + * + *

Static arguments can take tables, models, or scalar values. Each argument takes a set of + * {@link StaticArgumentTrait} that enable basic validation by the framework. + */ +@PublicEvolving +public class StaticArgument { + + private final String name; + private final @Nullable DataType dataType; + private final @Nullable Class conversionClass; + private final boolean isOptional; + private final EnumSet traits; + + private StaticArgument( + String name, + @Nullable DataType dataType, + @Nullable Class conversionClass, + boolean isOptional, + EnumSet traits) { + StaticArgumentTrait.checkIntegrity( + Preconditions.checkNotNull(traits, "Traits must not be null.")); + this.name = Preconditions.checkNotNull(name, "Name must not be null."); + this.dataType = dataType; + this.conversionClass = conversionClass; + this.isOptional = isOptional; + this.traits = traits; + } + + /** + * Declares a scalar argument such as {@code f(12)} or {@code f(otherColumn)}. + * + * @param name name for the assignment operator e.g. {@code f(myArg => 12)} + * @param dataType explicit type to which the argument is cast if necessary + * @param isOptional whether the argument is optional, if optional the corresponding data type + * must be nullable + */ + public static StaticArgument scalar(String name, DataType dataType, boolean isOptional) { + Preconditions.checkNotNull(dataType, "Data type must not be null."); + return new StaticArgument( + name, dataType, null, isOptional, EnumSet.of(StaticArgumentTrait.SCALAR)); + } + + /** + * Declares a table argument such as {@code f(t => myTable)} or {@code f(t => TABLE myTable))}. + * + *

The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW} (default) or {@link + * StaticArgumentTrait#TABLE_AS_SET} semantics. + * + *

By only providing a conversion class, the argument supports a "polymorphic" behavior. In + * other words: it accepts tables with an arbitrary number of columns with arbitrary data types. + * For this case, a class satisfying {@link RowType#supportsOutputConversion(Class)} must be + * used. + * + * @param name name for the assignment operator e.g. {@code f(myArg => 12)} + * @param conversionClass a class satisfying {@link RowType#supportsOutputConversion(Class)} + * @param isOptional whether the argument is optional + * @param traits set of {@link StaticArgumentTrait} requiring {@link StaticArgumentTrait#TABLE} + */ + public static StaticArgument table( + String name, + Class conversionClass, + boolean isOptional, + EnumSet traits) { + Preconditions.checkNotNull(conversionClass, "Conversion class must not be null."); + final EnumSet enrichedTraits = EnumSet.copyOf(traits); + enrichedTraits.add(StaticArgumentTrait.TABLE); + if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) { + enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW); + } + return new StaticArgument(name, null, conversionClass, isOptional, enrichedTraits); + } + + /** + * Declares a table argument such as {@code f(t => myTable)} or {@code f(t => TABLE myTable))}. + * + *

The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW} (default) or {@link + * StaticArgumentTrait#TABLE_AS_SET} semantics. + * + *

By providing a concrete data type, the argument only accepts tables with corresponding + * number of columns and data types. The data type must be a {@link RowType} or {@link + * StructuredType}. + * + * @param name name for the assignment operator e.g. {@code f(myArg => 12)} + * @param dataType explicit type to which the argument is cast if necessary + * @param isOptional whether the argument is optional, if optional the corresponding data type + * must be nullable + * @param traits set of {@link StaticArgumentTrait} requiring {@link StaticArgumentTrait#TABLE} + */ + public static StaticArgument table( + String name, + DataType dataType, + boolean isOptional, + EnumSet traits) { + Preconditions.checkNotNull(dataType, "Data type must not be null."); + return new StaticArgument(name, dataType, null, isOptional, enrichTableTraits(traits)); + } + + private static EnumSet enrichTableTraits( + EnumSet traits) { + final EnumSet enrichedTraits = EnumSet.copyOf(traits); + enrichedTraits.add(StaticArgumentTrait.TABLE); + if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) { + enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW); + } + return enrichedTraits; + } + + public String getName() { + return name; + } + + public Optional getDataType() { + return Optional.ofNullable(dataType); + } + + public Optional> getConversionClass() { + return Optional.ofNullable(conversionClass); + } + + public boolean isOptional() { + return isOptional; + } + + public EnumSet getTraits() { + return traits; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java new file mode 100644 index 0000000000000..76a4e6e26902a --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Declares traits for {@link StaticArgument}. They enable basic validation by the framework. + * + *

Some traits have dependencies to other traits, which is why this enum reflects a hierarchy in + * which {@link #SCALAR}, {@link #TABLE}, and {@link #MODEL} are the top-level roots. + */ +@PublicEvolving +public enum StaticArgumentTrait { + SCALAR(), + TABLE(), + MODEL(), + TABLE_AS_ROW(TABLE), + TABLE_AS_SET(TABLE), + OPTIONAL_PARTITION_BY(TABLE_AS_SET); + + private final Set requirements; + + StaticArgumentTrait(StaticArgumentTrait... requirements) { + this.requirements = Arrays.stream(requirements).collect(Collectors.toSet()); + } + + public static void checkIntegrity(EnumSet traits) { + if (traits.stream().filter(t -> t.requirements.isEmpty()).count() != 1) { + throw new ValidationException( + "Invalid argument traits. An argument must be declared as either scalar, table, or model."); + } + traits.forEach( + trait -> + trait.requirements.forEach( + requirement -> { + if (!traits.contains(requirement)) { + throw new ValidationException( + String.format( + "Invalid argument traits. Trait %s requires %s.", + trait, requirement)); + } + })); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java index e7bdaf77a9349..f302036fb573f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java @@ -25,17 +25,20 @@ import javax.annotation.Nullable; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Provides logic for the type inference of function calls. It includes: * *

* @@ -44,39 +47,20 @@ @PublicEvolving public final class TypeInference { - private final @Nullable List namedArguments; - - private final @Nullable List optionalArguments; - - private final @Nullable List typedArguments; - + private final @Nullable List staticArguments; private final InputTypeStrategy inputTypeStrategy; - - private final @Nullable TypeStrategy accumulatorTypeStrategy; - + private final LinkedHashMap stateTypeStrategies; private final TypeStrategy outputTypeStrategy; private TypeInference( - @Nullable List namedArguments, - @Nullable List optionalArguments, - @Nullable List typedArguments, + @Nullable List staticArguments, InputTypeStrategy inputTypeStrategy, - @Nullable TypeStrategy accumulatorTypeStrategy, + LinkedHashMap stateTypeStrategies, TypeStrategy outputTypeStrategy) { - this.namedArguments = namedArguments; - this.optionalArguments = optionalArguments; - this.typedArguments = typedArguments; + this.staticArguments = staticArguments; this.inputTypeStrategy = inputTypeStrategy; - this.accumulatorTypeStrategy = accumulatorTypeStrategy; + this.stateTypeStrategies = stateTypeStrategies; this.outputTypeStrategy = outputTypeStrategy; - if (namedArguments != null - && typedArguments != null - && namedArguments.size() != typedArguments.size()) { - throw new IllegalArgumentException( - String.format( - "Mismatch between typed arguments %d and named argument %d.", - namedArguments.size(), typedArguments.size())); - } } /** Builder for configuring and creating instances of {@link TypeInference}. */ @@ -84,50 +68,177 @@ public static TypeInference.Builder newBuilder() { return new TypeInference.Builder(); } - public Optional> getNamedArguments() { - return Optional.ofNullable(namedArguments); - } - - public Optional> getTypedArguments() { - return Optional.ofNullable(typedArguments); - } - - public Optional> getOptionalArguments() { - return Optional.ofNullable(optionalArguments); + public Optional> getStaticArguments() { + return Optional.ofNullable(staticArguments); } public InputTypeStrategy getInputTypeStrategy() { return inputTypeStrategy; } - public Optional getAccumulatorTypeStrategy() { - return Optional.ofNullable(accumulatorTypeStrategy); + public LinkedHashMap getStateTypeStrategies() { + return stateTypeStrategies; } public TypeStrategy getOutputTypeStrategy() { return outputTypeStrategy; } + /** @deprecated Use {@link #getStaticArguments()} instead. */ + @Deprecated + public Optional> getNamedArguments() { + return Optional.ofNullable(staticArguments) + .map( + args -> + args.stream() + .map(StaticArgument::getName) + .collect(Collectors.toList())); + } + + /** @deprecated Use {@link #getStaticArguments()} instead. */ + @Deprecated + public Optional> getTypedArguments() { + return Optional.ofNullable(staticArguments) + .map( + args -> + args.stream() + .map( + arg -> + arg.getDataType() + .orElseThrow( + () -> + new IllegalArgumentException( + "Scalar argument with a data type expected."))) + .collect(Collectors.toList())); + } + + /** @deprecated Use {@link #getStaticArguments()} instead. */ + @Deprecated + public Optional> getOptionalArguments() { + return Optional.ofNullable(staticArguments) + .map( + args -> + args.stream() + .map(StaticArgument::isOptional) + .collect(Collectors.toList())); + } + + /** @deprecated Use {@link #getStateTypeStrategies()} instead. */ + @Deprecated + public Optional getAccumulatorTypeStrategy() { + if (stateTypeStrategies.isEmpty()) { + return Optional.empty(); + } + if (stateTypeStrategies.size() != 1) { + throw new IllegalArgumentException( + "An accumulator should contain exactly one state type strategy."); + } + return Optional.of(stateTypeStrategies.values().iterator().next()); + } + + // -------------------------------------------------------------------------------------------- + // Builder // -------------------------------------------------------------------------------------------- /** Builder for configuring and creating instances of {@link TypeInference}. */ @PublicEvolving public static class Builder { - private @Nullable List namedArguments; + private @Nullable List staticArguments; + private InputTypeStrategy inputTypeStrategy = InputTypeStrategies.WILDCARD; + private LinkedHashMap stateTypeStrategies = + new LinkedHashMap<>(); + private @Nullable TypeStrategy outputTypeStrategy; + // Legacy + private @Nullable List namedArguments; private @Nullable List optionalArguments; - private @Nullable List typedArguments; - private InputTypeStrategy inputTypeStrategy = InputTypeStrategies.WILDCARD; + public Builder() { + // default constructor to allow a fluent definition + } - private @Nullable TypeStrategy accumulatorTypeStrategy; + /** + * Sets a list of arguments in a static signature that is not overloaded and does not + * support varargs. + * + *

Static arguments are a special case of an input type strategy and takes precedence. A + * signature can take tables, models, or scalar values. It allows optional and/or named + * argument like {@code f(myArg => 12)}. + */ + public Builder staticArguments(StaticArgument... staticArguments) { + this.staticArguments = Arrays.asList(staticArguments); + return this; + } - private @Nullable TypeStrategy outputTypeStrategy; + /** + * Sets a list of arguments in a static signature that is not overloaded and does not + * support varargs. + * + *

Static arguments are a special case of an input type strategy and takes precedence. A + * signature can take tables, models, or scalar values. It allows optional and/or named + * argument like {@code f(myArg => 12)}. + */ + public Builder staticArguments(List staticArgument) { + this.staticArguments = staticArgument; + return this; + } - public Builder() { - // default constructor to allow a fluent definition + /** + * Sets the strategy for inferring and validating input arguments in a function call. + * + *

A {@link InputTypeStrategies#WILDCARD} strategy function is assumed by default. + */ + public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) { + this.inputTypeStrategy = + Preconditions.checkNotNull( + inputTypeStrategy, "Input type strategy must not be null."); + return this; + } + + /** + * Sets the strategy for inferring the intermediate accumulator data type of an aggregate + * function call. + */ + public Builder accumulatorTypeStrategy(TypeStrategy accumulatorTypeStrategy) { + Preconditions.checkNotNull( + accumulatorTypeStrategy, "Accumulator type strategy must not be null."); + this.stateTypeStrategies.put( + "acc", StateTypeStrategyWrapper.of(accumulatorTypeStrategy)); + return this; + } + + /** + * Sets a map of state names to {@link StateTypeStrategy}s for inferring a function call's + * intermediate result data types (i.e. state entries). For aggregate functions, only one + * entry is allowed which defines the accumulator's data type. + */ + public Builder stateTypeStrategies( + LinkedHashMap stateTypeStrategies) { + this.stateTypeStrategies = stateTypeStrategies; + return this; + } + + /** + * Sets the strategy for inferring the final output data type of a function call. + * + *

Required. + */ + public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) { + this.outputTypeStrategy = + Preconditions.checkNotNull( + outputTypeStrategy, "Output type strategy must not be null."); + return this; + } + + public TypeInference build() { + return new TypeInference( + createStaticArguments(), + inputTypeStrategy, + stateTypeStrategies, + Preconditions.checkNotNull( + outputTypeStrategy, "Output type strategy must not be null.")); } /** @@ -137,7 +248,10 @@ public Builder() { *

This information is useful for SQL's concept of named arguments using the assignment * operator (e.g. {@code FUNC(max => 42)}). The names are used for reordering the call's * arguments to the formal argument order of the function. + * + * @deprecated Use {@link #staticArguments(List)} instead. */ + @Deprecated public Builder namedArguments(List argumentNames) { this.namedArguments = Preconditions.checkNotNull( @@ -145,7 +259,11 @@ public Builder namedArguments(List argumentNames) { return this; } - /** @see #namedArguments(List) */ + /** + * @see #namedArguments(List) + * @deprecated Use {@link #staticArguments(StaticArgument...)} instead. + */ + @Deprecated public Builder namedArguments(String... argumentNames) { return namedArguments(Arrays.asList(argumentNames)); } @@ -157,7 +275,10 @@ public Builder namedArguments(String... argumentNames) { *

This information is useful for SQL's concept of named arguments using the assignment * operator. The optionals are used to determine whether an argument is optional or required * in the function call. + * + * @deprecated Use {@link #staticArguments(List)} instead. */ + @Deprecated public Builder optionalArguments(List optionalArguments) { this.optionalArguments = Preconditions.checkNotNull( @@ -171,8 +292,10 @@ public Builder optionalArguments(List optionalArguments) { * *

This information is useful for optional arguments with default value. In particular, * the number of arguments that need to be filled with a default value and their types is - * important. + * + * @deprecated Use {@link #staticArguments(List)} instead. */ + @Deprecated public Builder typedArguments(List argumentTypes) { this.typedArguments = Preconditions.checkNotNull( @@ -180,55 +303,48 @@ public Builder typedArguments(List argumentTypes) { return this; } - /** @see #typedArguments(List) */ - public Builder typedArguments(DataType... argumentTypes) { - return typedArguments(Arrays.asList(argumentTypes)); - } - - /** - * Sets the strategy for inferring and validating input arguments in a function call. - * - *

A {@link InputTypeStrategies#WILDCARD} strategy function is assumed by default. - */ - public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) { - this.inputTypeStrategy = - Preconditions.checkNotNull( - inputTypeStrategy, "Input type strategy must not be null."); - return this; - } - - /** - * Sets the strategy for inferring the intermediate accumulator data type of a function - * call. - */ - public Builder accumulatorTypeStrategy(TypeStrategy accumulatorTypeStrategy) { - this.accumulatorTypeStrategy = - Preconditions.checkNotNull( - accumulatorTypeStrategy, "Accumulator type strategy must not be null."); - return this; - } - /** - * Sets the strategy for inferring the final output data type of a function call. - * - *

Required. + * @see #typedArguments(List) + * @deprecated Use {@link #staticArguments(StaticArgument...)} instead. */ - public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) { - this.outputTypeStrategy = - Preconditions.checkNotNull( - outputTypeStrategy, "Output type strategy must not be null."); - return this; + @Deprecated + public Builder typedArguments(DataType... argumentTypes) { + return typedArguments(Arrays.asList(argumentTypes)); } - public TypeInference build() { - return new TypeInference( - namedArguments, - optionalArguments, - typedArguments, - inputTypeStrategy, - accumulatorTypeStrategy, - Preconditions.checkNotNull( - outputTypeStrategy, "Output type strategy must not be null.")); + private @Nullable List createStaticArguments() { + if (staticArguments != null) { + return staticArguments; + } + // Legacy path + if (typedArguments != null) { + if (namedArguments != null && namedArguments.size() != typedArguments.size()) { + throw new IllegalArgumentException( + String.format( + "Mismatch between typed arguments %d and named arguments %d.", + typedArguments.size(), namedArguments.size())); + } + if (optionalArguments != null + && optionalArguments.size() != typedArguments.size()) { + throw new IllegalArgumentException( + String.format( + "Mismatch between typed arguments %d and optional arguments %d.", + typedArguments.size(), optionalArguments.size())); + } + return IntStream.range(0, typedArguments.size()) + .mapToObj( + pos -> + StaticArgument.scalar( + Optional.ofNullable(namedArguments) + .map(args -> args.get(pos)) + .orElse("arg" + pos), + typedArguments.get(pos), + Optional.ofNullable(optionalArguments) + .map(args -> args.get(pos)) + .orElse(false))) + .collect(Collectors.toList()); + } + return null; } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java index bdb6299f1f54a..5b7924e165346 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java @@ -24,8 +24,7 @@ import java.util.Optional; /** - * Strategy for inferring the data type of a function call. The inferred type might describe the - * final result or an intermediate result (accumulation type) of a function. + * Strategy for inferring a function call's result data type. * *

Note: Implementations should implement {@link Object#hashCode()} and {@link * Object#equals(Object)}. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java index dc3b36c932071..a438ef86cc0c9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java @@ -433,7 +433,6 @@ private static Stream functionSpecs() { // scalar function that takes any input TestSpec.forScalarFunction(InputGroupScalarFunction.class) - .expectNamedArguments("o") .expectOutputMapping( InputTypeStrategies.sequence( new String[] {"o"}, @@ -557,6 +556,7 @@ private static Stream functionSpecs() { TestSpec.forScalarFunction( "Scalar function with arguments hints all missing name", ArgumentHintMissingNameScalarFunction.class) + .expectNamedArguments("arg0", "arg1") .expectTypedArguments(DataTypes.STRING(), DataTypes.INT()), TestSpec.forScalarFunction( "Scalar function with arguments hints all missing partial name", @@ -695,7 +695,6 @@ private static Stream procedureSpecs() { InputTypeStrategies.sequence( InputTypeStrategies.explicit(DataTypes.BIGINT())), TypeStrategies.explicit(DataTypes.INT())), - // no arguments TestSpec.forProcedure(ZeroArgProcedure.class) .expectNamedArguments() @@ -704,7 +703,6 @@ private static Stream procedureSpecs() { InputTypeStrategies.sequence( new String[0], new ArgumentTypeStrategy[0]), TypeStrategies.explicit(DataTypes.INT())), - // test primitive arguments extraction TestSpec.forProcedure(MixedArgProcedure.class) .expectNamedArguments("i", "d") @@ -719,7 +717,6 @@ private static Stream procedureSpecs() { InputTypeStrategies.explicit(DataTypes.DOUBLE()) }), TypeStrategies.explicit(DataTypes.INT())), - // test overloaded arguments extraction TestSpec.forProcedure(OverloadedProcedure.class) .expectOutputMapping( @@ -739,7 +736,6 @@ private static Stream procedureSpecs() { }), TypeStrategies.explicit( DataTypes.BIGINT().notNull().bridgedTo(long.class))), - // test varying arguments extraction TestSpec.forProcedure(VarArgProcedure.class) .expectOutputMapping( @@ -752,7 +748,6 @@ private static Stream procedureSpecs() { DataTypes.INT().notNull().bridgedTo(int.class)) }), TypeStrategies.explicit(DataTypes.STRING())), - // test varying arguments extraction with byte TestSpec.forProcedure(VarArgWithByteProcedure.class) .expectOutputMapping( @@ -765,7 +760,6 @@ private static Stream procedureSpecs() { .bridgedTo(byte.class)) }), TypeStrategies.explicit(DataTypes.STRING())), - // output hint with input extraction TestSpec.forProcedure(ExtractWithOutputHintProcedure.class) .expectNamedArguments("i") @@ -777,7 +771,6 @@ private static Stream procedureSpecs() { InputTypeStrategies.explicit(DataTypes.INT()) }), TypeStrategies.explicit(DataTypes.INT())), - // output extraction with input hints TestSpec.forProcedure(ExtractWithInputHintProcedure.class) .expectNamedArguments("i", "b") @@ -794,17 +787,14 @@ private static Stream procedureSpecs() { // named arguments with overloaded function // expected no named argument for overloaded function TestSpec.forProcedure(NamedArgumentsProcedure.class), - - // scalar function that takes any input + // procedure function that takes any input TestSpec.forProcedure(InputGroupProcedure.class) - .expectNamedArguments("o") .expectOutputMapping( InputTypeStrategies.sequence( new String[] {"o"}, new ArgumentTypeStrategy[] {InputTypeStrategies.ANY}), TypeStrategies.explicit(DataTypes.STRING())), - - // scalar function that takes any input as vararg + // procedure function that takes any input as vararg TestSpec.forProcedure(VarArgInputGroupProcedure.class) .expectOutputMapping( InputTypeStrategies.varyingSequence( diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java index 9a63dbb0cb468..d49e376107fc5 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java @@ -423,7 +423,7 @@ ANY, explicit(DataTypes.INT()) TestSpec.forStrategy(WILDCARD) .typedArguments(DataTypes.INT(), DataTypes.STRING()) .calledWithArgumentTypes(DataTypes.TINYINT(), DataTypes.STRING()) - .expectSignature("f(INT, STRING)") + .expectSignature("f(arg0 => INT, arg1 => STRING)") .expectArgumentTypes(DataTypes.INT(), DataTypes.STRING()), // invalid typed arguments