Skip to content

Commit

Permalink
Add typehints to reify_timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
hjtran committed Jan 8, 2024
1 parent 9bee651 commit 04c3b2d
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from apache_beam.typehints.decorators import get_signature
from apache_beam.typehints.sharded_key_type import ShardedKeyType
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.annotations import deprecated
from apache_beam.utils.sharded_key import ShardedKey

Expand Down Expand Up @@ -775,7 +776,9 @@ def expand(self, pcoll):
globally_windowed = window.GlobalWindows.windowed_value(None)
MIN_TIMESTAMP = window.MIN_TIMESTAMP

def reify_timestamps(element, timestamp=DoFn.TimestampParam):
def reify_timestamps(
element: Tuple[K, V],
timestamp=DoFn.TimestampParam) -> Tuple[K, Tuple[V, Timestamp]]:
key, value = element
if timestamp == MIN_TIMESTAMP:
timestamp = None
Expand All @@ -801,7 +804,7 @@ def restore_timestamps(element):
key, windowed_values = element
return [wv.with_value((key, wv.value)) for wv in windowed_values]

ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any)
ungrouped = pcoll | Map(reify_timestamps)

# TODO(https://github.com/apache/beam/issues/19785) Using global window as
# one of the standard window. This is to mitigate the Dataflow Java Runner
Expand All @@ -811,10 +814,7 @@ def restore_timestamps(element):
triggerfn=Always(),
accumulation_mode=AccumulationMode.DISCARDING,
timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
result = (
ungrouped
| GroupByKey()
| FlatMap(restore_timestamps).with_output_types(Any))
result = (ungrouped | GroupByKey() | FlatMap(restore_timestamps))
result._windowing = windowing_saved
return result

Expand Down

0 comments on commit 04c3b2d

Please sign in to comment.