Skip to content

Commit 45a9b35

Browse files
committed
[FLINK-19318] Remove mentions of timeWindow*() from documentation
It's deprecated and we want to encourage users to use the explicit window assigners.
1 parent fe573dd commit 45a9b35

16 files changed

+44
-44
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ val text = env.socketTextStream(host, port, '\n')
4141
val windowCounts = text.flatMap { w => w.split("\\s") }
4242
.map { w => WordWithCount(w, 1) }
4343
.keyBy("word")
44-
.timeWindow(Time.seconds(5))
44+
.window(TumblingProcessingTimeWindow.of(Time.seconds(5)))
4545
.sum("count")
4646

4747
windowCounts.print()

docs/dev/connectors/cassandra.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ DataStream<Tuple2<String, Long>> result = text
161161
}
162162
})
163163
.keyBy(value -> value.f0)
164-
.timeWindow(Time.seconds(5))
164+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
165165
.sum(1);
166166

167167
CassandraSink.addSink(result)
@@ -186,7 +186,7 @@ val result: DataStream[(String, Long)] = text
186186
.map((_, 1L))
187187
// group by the tuple field "0" and sum up tuple field "1"
188188
.keyBy(_._1)
189-
.timeWindow(Time.seconds(5))
189+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
190190
.sum(1)
191191

192192
CassandraSink.addSink(result)
@@ -232,7 +232,7 @@ DataStream<WordCount> result = text
232232
}
233233
})
234234
.keyBy(WordCount::getWord)
235-
.timeWindow(Time.seconds(5))
235+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
236236

237237
.reduce(new ReduceFunction<WordCount>() {
238238
@Override

docs/dev/connectors/cassandra.zh.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ DataStream<Tuple2<String, Long>> result = text
161161
}
162162
})
163163
.keyBy(value -> value.f0)
164-
.timeWindow(Time.seconds(5))
164+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
165165
.sum(1);
166166

167167
CassandraSink.addSink(result)
@@ -186,7 +186,7 @@ val result: DataStream[(String, Long)] = text
186186
.map((_, 1L))
187187
// group by the tuple field "0" and sum up tuple field "1"
188188
.keyBy(_._1)
189-
.timeWindow(Time.seconds(5))
189+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
190190
.sum(1)
191191

192192
CassandraSink.addSink(result)
@@ -232,7 +232,7 @@ DataStream<WordCount> result = text
232232
}
233233
})
234234
.keyBy(WordCount::getWord)
235-
.timeWindow(Time.seconds(5))
235+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
236236

237237
.reduce(new ReduceFunction<WordCount>() {
238238
@Override

docs/dev/datastream_api.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public class WindowWordCount {
275275
.socketTextStream("localhost", 9999)
276276
.flatMap(new Splitter())
277277
.keyBy(value -> value.f0)
278-
.timeWindow(Time.seconds(5))
278+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
279279
.sum(1);
280280

281281
dataStream.print();
@@ -312,7 +312,7 @@ object WindowWordCount {
312312
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
313313
.map { (_, 1) }
314314
.keyBy(_._1)
315-
.timeWindow(Time.seconds(5))
315+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
316316
.sum(1)
317317

318318
counts.print()

docs/dev/datastream_api.zh.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public class WindowWordCount {
275275
.socketTextStream("localhost", 9999)
276276
.flatMap(new Splitter())
277277
.keyBy(value -> value.f0)
278-
.timeWindow(Time.seconds(5))
278+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
279279
.sum(1);
280280

281281
dataStream.print();
@@ -312,7 +312,7 @@ object WindowWordCount {
312312
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
313313
.map { (_, 1) }
314314
.keyBy(_._1)
315-
.timeWindow(Time.seconds(5))
315+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
316316
.sum(1)
317317

318318
counts.print()

docs/dev/event_time.zh.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ under the License.
2828

2929
有关如何在 Flink 程序中使用时间特性,请参阅[窗口]({% link dev/stream/operators/windows.zh.md %})和 [ProcessFunction]({% link dev/stream/operators/process_function.zh.md %}) 小节。
3030

31-
使用*事件时间*处理数据之前需要在程序中设置正确的*时间语义*。此项设置会定义源数据的处理方式(例如:程序是否会对数据分配时间戳),以及程序应使用什么时间语义执行 `KeyedStream.timeWindow(Time.seconds(30))` 之类的窗口操作。
31+
使用*事件时间*处理数据之前需要在程序中设置正确的*时间语义*。此项设置会定义源数据的处理方式(例如:程序是否会对数据分配时间戳),以及程序应使用什么时间语义执行 `KeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)))` 之类的窗口操作。
3232

3333
可以通过 `StreamExecutionEnvironment.setStreamTimeCharacteristic()` 设置程序的时间语义,示例如下:
3434

@@ -43,7 +43,7 @@ DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer<MyEvent>(topic
4343

4444
stream
4545
.keyBy( (event) -> event.getUser() )
46-
.timeWindow(Time.hours(1))
46+
.window(TumblingEventTimeWindows.of(Time.hours(1)))
4747
.reduce( (a, b) -> a.add(b) )
4848
.addSink(...);
4949
{% endhighlight %}
@@ -58,7 +58,7 @@ val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](
5858

5959
stream
6060
.keyBy( _.getUser )
61-
.timeWindow(Time.hours(1))
61+
.window(TumblingEventTimeWindows.of(Time.hours(1)))
6262
.reduce( (a, b) => a.add(b) )
6363
.addSink(...)
6464
{% endhighlight %}

docs/dev/event_timestamps_watermarks.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ DataStream<MyEvent> withTimestampsAndWatermarks = stream
138138

139139
withTimestampsAndWatermarks
140140
.keyBy( (event) -> event.getGroup() )
141-
.timeWindow(Time.seconds(10))
141+
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
142142
.reduce( (a, b) -> a.add(b) )
143143
.addSink(...);
144144
{% endhighlight %}
@@ -157,7 +157,7 @@ val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
157157

158158
withTimestampsAndWatermarks
159159
.keyBy( _.getGroup )
160-
.timeWindow(Time.seconds(10))
160+
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
161161
.reduce( (a, b) => a.add(b) )
162162
.addSink(...)
163163
{% endhighlight %}

docs/dev/event_timestamps_watermarks.zh.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ DataStream<MyEvent> withTimestampsAndWatermarks = stream
110110

111111
withTimestampsAndWatermarks
112112
.keyBy( (event) -> event.getGroup() )
113-
.timeWindow(Time.seconds(10))
113+
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
114114
.reduce( (a, b) -> a.add(b) )
115115
.addSink(...);
116116
{% endhighlight %}
@@ -129,7 +129,7 @@ val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
129129

130130
withTimestampsAndWatermarks
131131
.keyBy( _.getGroup )
132-
.timeWindow(Time.seconds(10))
132+
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
133133
.reduce( (a, b) => a.add(b) )
134134
.addSink(...)
135135
{% endhighlight %}

docs/dev/parallel.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ DataStream<String> text = [...]
5555
DataStream<Tuple2<String, Integer>> wordCounts = text
5656
.flatMap(new LineSplitter())
5757
.keyBy(value -> value.f0)
58-
.timeWindow(Time.seconds(5))
58+
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
5959
.sum(1).setParallelism(5);
6060

6161
wordCounts.print();
@@ -71,7 +71,7 @@ val text = [...]
7171
val wordCounts = text
7272
.flatMap{ _.split(" ") map { (_, 1) } }
7373
.keyBy(_._1)
74-
.timeWindow(Time.seconds(5))
74+
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
7575
.sum(1).setParallelism(5)
7676
wordCounts.print()
7777

@@ -114,7 +114,7 @@ val text = [...]
114114
val wordCounts = text
115115
.flatMap{ _.split(" ") map { (_, 1) } }
116116
.keyBy(_._1)
117-
.timeWindow(Time.seconds(5))
117+
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
118118
.sum(1)
119119
wordCounts.print()
120120

docs/dev/parallel.zh.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ DataStream<String> text = [...]
4646
DataStream<Tuple2<String, Integer>> wordCounts = text
4747
.flatMap(new LineSplitter())
4848
.keyBy(value -> value.f0)
49-
.timeWindow(Time.seconds(5))
49+
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
5050
.sum(1).setParallelism(5);
5151

5252
wordCounts.print();
@@ -62,7 +62,7 @@ val text = [...]
6262
val wordCounts = text
6363
.flatMap{ _.split(" ") map { (_, 1) } }
6464
.keyBy(_._1)
65-
.timeWindow(Time.seconds(5))
65+
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
6666
.sum(1).setParallelism(5)
6767
wordCounts.print()
6868

@@ -99,7 +99,7 @@ val text = [...]
9999
val wordCounts = text
100100
.flatMap{ _.split(" ") map { (_, 1) } }
101101
.keyBy(_._1)
102-
.timeWindow(Time.seconds(5))
102+
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
103103
.sum(1)
104104
wordCounts.print()
105105

docs/dev/stream/experimental.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ Code example:
6060
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6161
DataStreamSource<Integer> source = ...
6262
DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
63-
.timeWindow(Time.seconds(1))
63+
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
6464
.reduce((a, b) -> a + b)
6565
.addSink(new DiscardingSink<>());
6666
env.execute();
@@ -72,7 +72,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
7272
env.setParallelism(1)
7373
val source = ...
7474
new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
75-
.timeWindow(Time.seconds(1))
75+
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
7676
.reduce((a, b) => a + b)
7777
.addSink(new DiscardingSink[Int])
7878
env.execute()

docs/dev/stream/experimental.zh.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ Code example:
6060
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6161
DataStreamSource<Integer> source = ...
6262
DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
63-
.timeWindow(Time.seconds(1))
63+
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
6464
.reduce((a, b) -> a + b)
6565
.addSink(new DiscardingSink<>());
6666
env.execute();
@@ -72,7 +72,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
7272
env.setParallelism(1)
7373
val source = ...
7474
new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
75-
.timeWindow(Time.seconds(1))
75+
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
7676
.reduce((a, b) => a + b)
7777
.addSink(new DiscardingSink[Int])
7878
env.execute()

docs/dev/stream/operators/windows.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ DataStream<Tuple2<String, Long>> input = ...;
684684

685685
input
686686
.keyBy(t -> t.f0)
687-
.timeWindow(Time.minutes(5))
687+
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
688688
.process(new MyProcessWindowFunction());
689689

690690
/* ... */
@@ -711,7 +711,7 @@ val input: DataStream[(String, Long)] = ...
711711

712712
input
713713
.keyBy(_._1)
714-
.timeWindow(Time.minutes(5))
714+
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
715715
.process(new MyProcessWindowFunction())
716716

717717
/* ... */
@@ -758,7 +758,7 @@ DataStream<SensorReading> input = ...;
758758

759759
input
760760
.keyBy(<key selector>)
761-
.timeWindow(<duration>)
761+
.window(<window assigner>)
762762
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
763763

764764
// Function definitions
@@ -791,7 +791,7 @@ val input: DataStream[SensorReading] = ...
791791

792792
input
793793
.keyBy(<key selector>)
794-
.timeWindow(<duration>)
794+
.window(<window assigner>)
795795
.reduce(
796796
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
797797
( key: String,
@@ -821,7 +821,7 @@ DataStream<Tuple2<String, Long>> input = ...;
821821

822822
input
823823
.keyBy(<key selector>)
824-
.timeWindow(<duration>)
824+
.window(<window assigner>)
825825
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
826826

827827
// Function definitions
@@ -874,7 +874,7 @@ val input: DataStream[(String, Long)] = ...
874874

875875
input
876876
.keyBy(<key selector>)
877-
.timeWindow(<duration>)
877+
.window(<window assigner>)
878878
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
879879

880880
// Function definitions

docs/dev/stream/operators/windows.zh.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ DataStream<Tuple2<String, Long>> input = ...;
684684

685685
input
686686
.keyBy(t -> t.f0)
687-
.timeWindow(Time.minutes(5))
687+
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
688688
.process(new MyProcessWindowFunction());
689689

690690
/* ... */
@@ -711,7 +711,7 @@ val input: DataStream[(String, Long)] = ...
711711

712712
input
713713
.keyBy(_._1)
714-
.timeWindow(Time.minutes(5))
714+
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
715715
.process(new MyProcessWindowFunction())
716716

717717
/* ... */
@@ -758,7 +758,7 @@ DataStream<SensorReading> input = ...;
758758

759759
input
760760
.keyBy(<key selector>)
761-
.timeWindow(<duration>)
761+
.window(<window assigner>)
762762
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
763763

764764
// Function definitions
@@ -791,7 +791,7 @@ val input: DataStream[SensorReading] = ...
791791

792792
input
793793
.keyBy(<key selector>)
794-
.timeWindow(<duration>)
794+
.window(<window assigner>)
795795
.reduce(
796796
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
797797
( key: String,
@@ -821,7 +821,7 @@ DataStream<Tuple2<String, Long>> input = ...;
821821

822822
input
823823
.keyBy(<key selector>)
824-
.timeWindow(<duration>)
824+
.window(<window assigner>)
825825
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
826826

827827
// Function definitions
@@ -874,7 +874,7 @@ val input: DataStream[(String, Long)] = ...
874874

875875
input
876876
.keyBy(<key selector>)
877-
.timeWindow(<duration>)
877+
.window(<window assigner>)
878878
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
879879

880880
// Function definitions

docs/learn-flink/streaming_analytics.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -411,9 +411,9 @@ For example, it works to do this:
411411
{% highlight java %}
412412
stream
413413
.keyBy(t -> t.key)
414-
.timeWindow(<time specification>)
414+
.window(<window assigner>)
415415
.reduce(<reduce function>)
416-
.timeWindowAll(<same time specification>)
416+
.windowAll(<same window assigner>)
417417
.reduce(<same reduce function>)
418418
{% endhighlight %}
419419

docs/learn-flink/streaming_analytics.zh.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,9 @@ Flink 的窗口 API 某些方面有一些奇怪的行为,可能和我们预期
352352
{% highlight java %}
353353
stream
354354
.keyBy(t -> t.key)
355-
.timeWindow(<time specification>)
355+
.window(<window assigner>)
356356
.reduce(<reduce function>)
357-
.timeWindowAll(<same time specification>)
357+
.windowAll(<same window assigner>)
358358
.reduce(<same reduce function>)
359359
{% endhighlight %}
360360

0 commit comments

Comments
 (0)