Skip to content

Commit

Permalink
[yaml] yaml_transform.py unit tests python - PR 3/3 (apache#27356)
Browse files Browse the repository at this point in the history
* SafeLineLoader unit tests
* LightweightScope unit tests
* Scope Unit Tests
* yaml_transform.py - chain_as_composite() unit tests
* yaml_transform.py - normalize_source_sink() unit tests
* yaml_transform.py - preprocess_source_sink() unit tests
* yaml_transform.py - normalize_inputs_outputs() unit tests
* yaml_transform.py - identify_object() and extract_name() unit tests
* yaml_transform.py - push_windowing_to_roots() unit tests
* yaml_transform.py - preprocess_windowing() unit tests
* yaml_transform.py - preprocess_flattened_inputs() unit tests
* yaml_transform.py - ensure_transforms_have_types() unit tests
* yaml_transform.py - expand_pipeline() unit tests
* yaml_transform.py - YamlTransform() unit tests
  • Loading branch information
bzablocki authored Aug 28, 2023
1 parent 505f942 commit 45ea758
Show file tree
Hide file tree
Showing 2 changed files with 845 additions and 23 deletions.
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ def expand_chain_transform(spec, scope):


def chain_as_composite(spec):
def is_not_output_of_last_transform(new_transforms, value):
return (
('name' in new_transforms[-1] and
value != new_transforms[-1]['name']) or
('type' in new_transforms[-1] and value != new_transforms[-1]['type']))

# A chain is simply a composite transform where all inputs and outputs
# are implicit.
spec = normalize_source_sink(spec)
Expand All @@ -420,6 +426,12 @@ def chain_as_composite(spec):

last_transform = new_transforms[-1]['__uuid__']
if has_explicit_outputs:
for (key, value) in composite_spec['output'].items():
if is_not_output_of_last_transform(new_transforms, value):
raise ValueError(
f"Explicit output {identify_object(value)} of the chain transform"
f" is not an output of the last transform.")

composite_spec['output'] = {
key: f'{last_transform}.{value}'
for (key, value) in composite_spec['output'].items()
Expand Down Expand Up @@ -547,7 +559,7 @@ def preprocess_windowing(spec):

windowing = spec.pop('windowing')
if spec['input']:
# Apply the windowing to all inputs by wrapping it in a trasnform that
# Apply the windowing to all inputs by wrapping it in a transform that
# first applies windowing and then applies the original transform.
original_inputs = spec['input']
windowing_transforms = [{
Expand Down
Loading

0 comments on commit 45ea758

Please sign in to comment.