Skip to content

Commit

Permalink
[FLINK-36789][table-common] Update FunctionHint and ArgumentHint with…
Browse files Browse the repository at this point in the history
… traits
  • Loading branch information
twalthr committed Nov 25, 2024
1 parent 0c338ae commit 05f28b4
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@
* <p>An {@code ArgumentHint} can be used to provide hints about the name, optionality, and data
* type of argument.
*
* <p>It combines the functionality of {@link FunctionHint#argumentNames()} and {@link DataTypeHint}
* annotations to conveniently group argument-related information together in function declarations.
* <p>{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false)} is a
* scalar argument with the data type STRING, named "in1", and cannot be omitted when calling.
*
* <p>{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false} is an
* argument with the type String, named in1, and cannot be omitted when calling.
* @see FunctionHint
*/
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface ArgumentHint {

/**
* The kind of the argument.
*
* <p>Only applies to {@code ProcessTableFunction}s (PTFs). Others can only take scalar values.
*/
ArgumentTrait[] value() default {ArgumentTrait.SCALAR};

/**
* The name of the argument.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.annotation;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.inference.StaticArgumentTrait;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Declares traits for {@link ArgumentHint}. They enable basic validation by the framework.
*
* <p>Some traits have dependencies to other traits, which is why this enum reflects a hierarchy in
* which {@link #SCALAR}, {@link #TABLE_AS_ROW}, and {@link #TABLE_AS_SET} are the top-level roots.
*/
@PublicEvolving
public enum ArgumentTrait {

/**
* An argument that accepts a scalar value. For example: f(1), f(true), f('Some string').
*
* <p>It's the default if no {@link ArgumentHint} is provided.
*/
SCALAR(StaticArgumentTrait.SCALAR),

/**
* An argument that accepts a table "as row" (i.e. with row semantics). This trait only applies
* to {@code ProcessTableFunction} (PTF).
*
* <p>For scalability, input tables are distributed into virtual processors. Each virtual
* processor executes a PTF instance and has access only to a share of the entire table. The
* argument declaration decides about the size of the share and co-location of data.
*
* <p>A table with row semantics assumes that there is no correlation between rows and each row
* can be processed independently. The framework is free in how to distribute rows among virtual
* processors and each virtual processor has access only to the currently processed row.
*/
TABLE_AS_ROW(StaticArgumentTrait.TABLE_AS_ROW),

/**
* An argument that accepts a table "as set" (i.e. with set semantics). This trait only applies
* to {@code ProcessTableFunction} (PTF).
*
* <p>For scalability, input tables are distributed into virtual processors. Each virtual
* processor executes a PTF instance and has access only to a share of the entire table. The
* argument declaration decides about the size of the share and co-location of data.
*
* <p>A table with set semantics assumes that there is a correlation between rows. When calling
* the function, the PARTITION BY clause defines the columns for correlation. The framework
* ensures that all rows belonging to same set are co-located. A PTF instance is able to access
* all rows belonging to the same set. In other words: The virtual processor is scoped under a
* key context.
*/
TABLE_AS_SET(StaticArgumentTrait.TABLE_AS_SET),

/**
* Defines that a PARTITION BY clause is optional for {@link #TABLE_AS_SET}. By default, it is
* mandatory for improving the parallel execution by distributing the table by key.
*/
OPTIONAL_PARTITION_BY(StaticArgumentTrait.OPTIONAL_PARTITION_BY, TABLE_AS_SET);

private final StaticArgumentTrait staticTrait;
private final Set<ArgumentTrait> requirements;

ArgumentTrait(StaticArgumentTrait staticTrait, ArgumentTrait... requirements) {
this.staticTrait = staticTrait;
this.requirements = Arrays.stream(requirements).collect(Collectors.toSet());
}

public Set<ArgumentTrait> getRequirements() {
return requirements;
}

public StaticArgumentTrait toStaticTrait() {
return staticTrait;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.lang.annotation.Target;

/**
* A hint that influences the reflection-based extraction of input types, accumulator types, and
* output types for constructing the {@link TypeInference} logic of a {@link UserDefinedFunction}.
* A hint that influences the reflection-based extraction of arguments, accumulator, and output for
* constructing the {@link TypeInference} logic of a {@link UserDefinedFunction}.
*
* <p>One or more annotations can be declared on top of a {@link UserDefinedFunction} class or
* individually for each {@code eval()/accumulate()} method for overloading function signatures. All
Expand All @@ -42,18 +42,24 @@
* part and let the default extraction do the rest:
*
* <pre>{@code
* // accepts (INT, STRING) and returns BOOLEAN
* // accepts (INT, STRING) and returns BOOLEAN,
* // the arguments have names and are optional
* @FunctionHint(
* argument = [(name = "f1", @DataTypeHint("INT"), isOptional = true),
* (name = "f2", @DataTypeHint("STRING"), isOptional = true)],
* arguments = {
* @ArgumentHint(type = @DataTypeHint("INT"), name = "in1", isOptional = true),
* @ArgumentHint(type = @DataTypeHint("STRING"), name = "in2", isOptional = true)
* },
* output = @DataTypeHint("BOOLEAN")
* )
* class X extends ScalarFunction { ... }
*
* // accepts (INT, STRING...) and returns BOOLEAN
* // accepts (INT, STRING...) and returns BOOLEAN,
* // the arguments have names
* @FunctionHint(
* argument = [(name = "f1", @DataTypeHint("INT"), isOptional = false),
* (name = "f2", @DataTypeHint("STRING"), isOptional = false)],
* arguments = {
* @ArgumentHint(type = @DataTypeHint("INT"), name = "in1"),
* @ArgumentHint(type = @DataTypeHint("STRING"), name = "in2")
* },
* isVarArgs = true,
* output = @DataTypeHint("BOOLEAN")
* )
Expand Down Expand Up @@ -122,6 +128,7 @@
* }</pre>
*
* @see DataTypeHint
* @see ArgumentHint
*/
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
Expand All @@ -131,8 +138,7 @@

// Note to implementers:
// Because "null" is not supported as an annotation value. Every annotation parameter *must*
// have
// some representation for unknown values in order to merge multi-level annotations.
// have some representation for unknown values in order to merge multi-level annotations.

/**
* Explicitly lists the argument types that a function takes as input.
Expand All @@ -141,8 +147,10 @@
* used.
*
* <p>Note: Specifying the input arguments manually disables the entire reflection-based
* extraction around arguments. This means that also {@link #isVarArgs()} and {@link
* #argumentNames()} need to be specified manually if required.
* extraction around arguments. This means that also {@link #isVarArgs()} needs to be specified
* manually if required.
*
* <p>Use {@link #arguments()} for more control about argument names and argument kinds.
*/
DataTypeHint[] input() default @DataTypeHint();

Expand All @@ -157,25 +165,14 @@
boolean isVarArgs() default false;

/**
* Explicitly lists the argument names that a function takes as input.
* Explicitly lists the arguments that a function takes as input. Including their names, data
* types, kinds, and whether they are optional.
*
* <p>By default, if {@link #input()} is defined, explicit argument names are undefined and this
* parameter can be used to provide argument names. If {@link #input()} is not defined, the
* reflection-based extraction is used, thus, this parameter is ignored.
* <p>It is recommended to use this parameter instead of {@link #input()}. Using both {@link
* #input()} and this parameter is not allowed. Specifying the list of arguments manually
* disables the entire reflection-based extraction around arguments.
*/
String[] argumentNames() default {""};

/**
* Explicitly lists the argument that a function takes as input, including their names, types,
* and whether they are optional.
*
* <p>By default, it is recommended to use this parameter instead of {@link #input()}. If the
* type of argumentHint is not defined, it will be considered an invalid argument and an
* exception will be thrown. Additionally, both this parameter and {@link #input()} cannot be
* defined at the same time. If neither argument nor {@link #input()} are defined,
* reflection-based extraction will be used.
*/
ArgumentHint[] argument() default {};
ArgumentHint[] arguments() default {};

/**
* Explicitly defines the intermediate result type that a function uses as accumulator.
Expand All @@ -192,4 +189,32 @@
* used.
*/
DataTypeHint output() default @DataTypeHint();

// --------------------------------------------------------------------------------------------
// Legacy
// --------------------------------------------------------------------------------------------

/**
* Explicitly lists the argument names that a function takes as input.
*
* <p>By default, if {@link #input()} is defined, explicit argument names are undefined and this
* parameter can be used to provide argument names. If {@link #input()} is not defined, the
* reflection-based extraction is used, thus, this parameter is ignored.
*
* @deprecated Use {@link #arguments()} instead.
*/
@Deprecated
String[] argumentNames() default {""};

/**
* Explicitly lists the arguments that a function takes as input. Including their names, data
* types, kinds, and whether they are optional.
*
* <p>It is recommended to use this parameter instead of {@link #input()}. Specifying the list
* of arguments manually disables the entire reflection-based extraction around arguments.
*
* @deprecated Use {@link #arguments()} instead.
*/
@Deprecated
ArgumentHint[] argument() default {};
}
Loading

0 comments on commit 05f28b4

Please sign in to comment.