Skip to content

Commit

Permalink
[FLINK-37136][runtime] Implement Window extension for DataStream V2
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup authored and reswqa committed Jan 18, 2025
1 parent 6de1fa2 commit f2460c8
Show file tree
Hide file tree
Showing 32 changed files with 5,051 additions and 317 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.apache.flink.annotation.Internal;

import java.util.Objects;

/**
* Utility class for implementing CoGroupedStream in DataStream V1, as well as two-input window
* operations in DataStream V2.
*/
@Internal
public class TaggedUnion<T1, T2> {
private final T1 one;
private final T2 two;

private TaggedUnion(T1 one, T2 two) {
this.one = one;
this.two = two;
}

public boolean isOne() {
return one != null;
}

public boolean isTwo() {
return two != null;
}

public T1 getOne() {
return one;
}

public T2 getTwo() {
return two;
}

public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
return new TaggedUnion<>(one, null);
}

public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
return new TaggedUnion<>(null, two);
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}

if (!(obj instanceof TaggedUnion)) {
return false;
}

TaggedUnion<?, ?> other = (TaggedUnion<?, ?>) obj;
return Objects.equals(one, other.one) && Objects.equals(two, other.two);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.impl.builtin;

import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.function.TwoInputNonBroadcastWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.function.TwoOutputWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.window.function.InternalOneInputWindowStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.window.function.InternalTwoInputWindowStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.window.function.InternalTwoOutputWindowStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.window.utils.WindowUtils;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.TaggedUnion;

/** Window-related implementations in {@link BuiltinFuncs}. */
public class BuiltinWindowFuncs {

/**
* Wrap the WindowStrategy and OneInputWindowStreamProcessFunction within a ProcessFunction to
* perform the window operation.
*/
public static <IN, OUT, W extends Window> OneInputStreamProcessFunction<IN, OUT> window(
WindowStrategy windowStrategy,
OneInputWindowStreamProcessFunction<IN, OUT> windowProcessFunction) {
WindowAssigner<IN, W> windowAssigner =
(WindowAssigner<IN, W>) WindowUtils.createWindowAssigner(windowStrategy);
return new InternalOneInputWindowStreamProcessFunction<>(
windowProcessFunction,
windowAssigner,
windowAssigner.getDefaultTrigger(),
WindowUtils.getAllowedLateness(windowStrategy),
windowStrategy);
}

/**
* Wrap the WindowStrategy and TwoInputWindowStreamProcessFunction within a ProcessFunction to
* perform the window operation.
*/
public static <IN1, IN2, OUT, W extends Window>
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> window(
WindowStrategy windowStrategy,
TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT>
windowProcessFunction) {
WindowAssigner<TaggedUnion<IN1, IN2>, W> windowAssigner =
(WindowAssigner<TaggedUnion<IN1, IN2>, W>)
WindowUtils.createWindowAssigner(windowStrategy);
return new InternalTwoInputWindowStreamProcessFunction<>(
windowProcessFunction,
windowAssigner,
windowAssigner.getDefaultTrigger(),
WindowUtils.getAllowedLateness(windowStrategy),
windowStrategy);
}

/**
* Wrap the WindowStrategy and TwoOutputWindowStreamProcessFunction within a ProcessFunction to
* perform the window operation.
*/
public static <IN, OUT1, OUT2, W extends Window>
TwoOutputStreamProcessFunction<IN, OUT1, OUT2> window(
WindowStrategy windowStrategy,
TwoOutputWindowStreamProcessFunction<IN, OUT1, OUT2> windowProcessFunction) {
WindowAssigner<IN, W> windowAssigner =
(WindowAssigner<IN, W>) WindowUtils.createWindowAssigner(windowStrategy);
return new InternalTwoOutputWindowStreamProcessFunction<>(
windowProcessFunction,
windowAssigner,
windowAssigner.getDefaultTrigger(),
WindowUtils.getAllowedLateness(windowStrategy),
windowStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.datastream.impl.extension.join.operators;

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.datastream.api.extension.join.JoinType;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.ListStateDescriptor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.impl.extension.window.context;

import org.apache.flink.api.common.state.AggregatingStateDeclaration;
import org.apache.flink.api.common.state.ListStateDeclaration;
import org.apache.flink.api.common.state.MapStateDeclaration;
import org.apache.flink.api.common.state.ReducingStateDeclaration;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.state.v2.AggregatingState;
import org.apache.flink.api.common.state.v2.AppendingState;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;

import javax.annotation.Nullable;

import java.util.Optional;

/**
* Default implementation of the {@link OneInputWindowContext}.
*
* @param <K> Type of the window key.
* @param <IN> Type of the input elements.
* @param <W> Type of the window.
*/
public class DefaultOneInputWindowContext<K, IN, W extends Window>
implements OneInputWindowContext<IN> {

/**
* The current processing window. An instance should be set every time before accessing
* window-related attributes, data, and state.
*/
@Nullable private W window;

/** Use to retrieve state associated with windows. */
private final WindowStateStore<K, W> windowStateStore;

/** The state utilized for storing window data. */
private final AppendingState<IN, StateIterator<IN>, Iterable<IN>> windowState;

public DefaultOneInputWindowContext(
@Nullable W window,
AppendingState<IN, StateIterator<IN>, Iterable<IN>> windowState,
WindowProcessFunction windowProcessFunction,
AbstractAsyncStateStreamOperator<?> operator,
TypeSerializer<W> windowSerializer,
boolean isMergingWindow) {
this.window = window;
this.windowState = windowState;
this.windowStateStore =
new WindowStateStore<>(
windowProcessFunction, operator, windowSerializer, isMergingWindow);
}

public void setWindow(W window) {
this.window = window;
}

@Override
public long getStartTime() {
if (window instanceof TimeWindow) {
return ((TimeWindow) window).getStart();
}
return -1;
}

@Override
public long getEndTime() {
if (window instanceof TimeWindow) {
return ((TimeWindow) window).getEnd();
}
return -1;
}

@Override
public <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> stateDeclaration)
throws Exception {
return windowStateStore.getWindowState(stateDeclaration, window);
}

@Override
public <KEY, V> Optional<MapState<KEY, V>> getWindowState(
MapStateDeclaration<KEY, V> stateDeclaration) throws Exception {
return windowStateStore.getWindowState(stateDeclaration, window);
}

@Override
public <T> Optional<ValueState<T>> getWindowState(ValueStateDeclaration<T> stateDeclaration)
throws Exception {
return windowStateStore.getWindowState(stateDeclaration, window);
}

@Override
public <T> Optional<ReducingState<T>> getWindowState(
ReducingStateDeclaration<T> stateDeclaration) throws Exception {
return windowStateStore.getWindowState(stateDeclaration, window);
}

@Override
public <T, ACC, OUT> Optional<AggregatingState<T, OUT>> getWindowState(
AggregatingStateDeclaration<T, ACC, OUT> stateDeclaration) throws Exception {
return windowStateStore.getWindowState(stateDeclaration, window);
}

@Override
public void putRecord(IN record) {
windowState.add(record);
}

@Override
public Iterable<IN> getAllRecords() {
return windowState.get();
}
}
Loading

0 comments on commit f2460c8

Please sign in to comment.