Skip to content

Commit

Permalink
[yaml] Add Beam YAML Examples (apache#30003)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber authored Mar 8, 2024
1 parent a391198 commit 53e8efa
Show file tree
Hide file tree
Showing 20 changed files with 1,138 additions and 4 deletions.
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/testing/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def __init__(
argv=None,
is_integration_test=False,
blocking=True,
additional_pipeline_args=None):
additional_pipeline_args=None,
display_data=None):
"""Initialize a pipeline object for test.
Args:
Expand All @@ -93,6 +94,8 @@ def __init__(
completed.
additional_pipeline_args (List[str]): additional pipeline arguments to be
included when construction the pipeline options object.
display_data (Dict[str, Any]): a dictionary of static data associated
with this pipeline that can be displayed when it runs.
Raises:
ValueError: if either the runner or options argument is not
Expand All @@ -106,7 +109,7 @@ def __init__(
self.blocking = blocking
if options is None:
options = PipelineOptions(self.options_list)
super().__init__(runner, options)
super().__init__(runner, options, display_data=display_data)

def run(self, test_runner_api=True):
result = super().run(
Expand Down
67 changes: 67 additions & 0 deletions sdks/python/apache_beam/yaml/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Examples Catalog

<!-- TOC -->
* [Examples Catalog](#examples-catalog)
* [Wordcount](#wordcount)
* [Transforms](#transforms)
* [Element-wise](#element-wise)
* [Aggregation](#aggregation)
<!-- TOC -->

This module contains a series of Beam YAML code samples that can be run using
the command:
```
python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
```

## Wordcount
A good starting place is the [Wordcount](wordcount_minimal.yaml) example under
the root example directory.
This example reads in a text file, splits the text on each word, groups by each
word, and counts the occurrence of each word. This is a classic example used in
the other SDK's and shows off many of the functionalities of Beam YAML.

## Transforms

Examples in this directory show off the various built-in transforms of the Beam
YAML framework.

### Element-wise
These examples leverage the built-in mapping transforms including `MapToFields`,
`Filter` and `Explode`. More information can be found about mapping transforms
[here](https://beam.apache.org/documentation/sdks/yaml-udf/).

### Aggregation
These examples leverage the built-in `Combine` transform for performing simple
aggregations including sum, mean, count, etc.

These examples are experimental and require that
`yaml_experimental_features: Combine` be specified under the `options` tag, or
by passing `--yaml_experimental_features=Combine` to the command to run the
pipeline. i.e.
```
python -m apache_beam.yaml.main \
--pipeline_spec_file=/path/to/example.yaml \
--yaml_experimental_features=Combine
```
More information can be found about aggregation transforms
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/yaml/examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
154 changes: 154 additions & 0 deletions sdks/python/apache_beam/yaml/examples/examples_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import glob
import logging
import os
import random
import unittest
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from unittest import mock

import yaml

import apache_beam as beam
from apache_beam import PCollection
from apache_beam.examples.snippets.util import assert_matches_stdout
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.yaml import yaml_transform
from apache_beam.yaml.readme_test import TestEnvironment
from apache_beam.yaml.readme_test import replace_recursive


def check_output(expected: List[str]):
def _check_inner(actual: PCollection[str]):
formatted_actual = actual | beam.Map(
lambda row: str(beam.Row(**row._asdict())))
assert_matches_stdout(formatted_actual, expected)

return _check_inner


def create_test_method(
pipeline_spec_file: str,
custom_preprocessor: Optional[Callable[..., Union[Dict, List]]] = None):
@mock.patch('apache_beam.Pipeline', TestPipeline)
def test_yaml_example(self):
with open(pipeline_spec_file, encoding="utf-8") as f:
lines = f.readlines()
expected_key = '# Expected:\n'
if expected_key in lines:
expected = lines[lines.index('# Expected:\n') + 1:]
else:
raise ValueError(
f"Missing '# Expected:' tag in example file '{pipeline_spec_file}'")
for i, line in enumerate(expected):
expected[i] = line.replace('# ', '').replace('\n', '')
pipeline_spec = yaml.load(
''.join(lines), Loader=yaml_transform.SafeLineLoader)

with TestEnvironment() as env:
if custom_preprocessor:
pipeline_spec = custom_preprocessor(pipeline_spec, expected, env)
with beam.Pipeline(options=PipelineOptions(
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {})))) as p:
actual = yaml_transform.expand_pipeline(p, pipeline_spec)
check_output(expected)(actual)

return test_yaml_example


class YamlExamplesTestSuite:
_test_preprocessor: Dict[str, Callable[..., Union[Dict, List]]] = {}

def __init__(self, name: str, path: str):
self._test_suite = self.create_test_suite(name, path)

def run(self):
return self._test_suite

@classmethod
def parse_test_methods(cls, path: str):
files = glob.glob(path)
if not files and os.path.exists(path) and os.path.isfile(path):
files = [path]
for file in files:
test_name = f'test_{file.split(os.sep)[-1].replace(".", "_")}'
custom_preprocessor = cls._test_preprocessor.get(test_name, None)
yield test_name, create_test_method(file, custom_preprocessor)

@classmethod
def create_test_suite(cls, name: str, path: str):
return type(name, (unittest.TestCase, ), dict(cls.parse_test_methods(path)))

@classmethod
def register_test_preprocessor(cls, test_name: str):
def apply(preprocessor):
cls._test_preprocessor[test_name] = preprocessor
return preprocessor

return apply


@YamlExamplesTestSuite.register_test_preprocessor('test_wordcount_minimal_yaml')
def _wordcount_test_preprocessor(
test_spec: str, expected: List[str], env: TestEnvironment):
all_words = []
for element in expected:
word = element.split('=')[1].split(',')[0].replace("'", '')
count = int(element.split('=')[2].replace(')', ''))
all_words += [word] * count
random.shuffle(all_words)

lines = []
while all_words:
line_length = random.randint(1, min(10, len(all_words)))
line = " ".join(
all_words.pop(random.randrange(len(all_words)))
for _ in range(line_length))
lines.append(line)

return replace_recursive(
test_spec,
'ReadFromText',
'path',
env.input_file('kinglear.txt', '\n'.join(lines)))


YAML_DOCS_DIR = os.path.join(os.path.dirname(__file__))
ExamplesTest = YamlExamplesTestSuite(
'ExamplesTest', os.path.join(YAML_DOCS_DIR, '*.yaml')).run()

ElementWiseTest = YamlExamplesTestSuite(
'ElementwiseExamplesTest',
os.path.join(YAML_DOCS_DIR, 'transforms/elementwise/*.yaml')).run()

AggregationTest = YamlExamplesTestSuite(
'AggregationExamplesTest',
os.path.join(YAML_DOCS_DIR, 'transforms/aggregation/*.yaml')).run()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
62 changes: 62 additions & 0 deletions sdks/python/apache_beam/yaml/examples/regex_matches.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

# This pipline creates a series of {plant: description} key pairs, matches all
# elements to a valid regex, filters out non-matching entries, then logs the
# output.
pipeline:
type: chain
transforms:
- type: Create
name: Garden plants
config:
elements:
- plant: '🍓, Strawberry, perennial'
- plant: '🥕, Carrot, biennial ignoring trailing words'
- plant: '🍆, Eggplant, perennial'
- plant: '🍅, Tomato, annual'
- plant: '🥔, Potato, perennial'
- plant: '# 🍌, invalid, format'
- plant: 'invalid, 🍉, format'
- type: MapToFields
name: Parse plants
config:
language: python
fields:
plant:
callable: |
import re
def regex_filter(row):
match = re.match("(?P<icon>[^\s,]+), *(\w+), *(\w+)", row.plant)
return match.group(0) if match else match
# Filters out None values produced by values that don't match regex
- type: Filter
config:
language: python
keep: plant
- type: LogForTesting

# Expected:
# Row(plant='🍓, Strawberry, perennial')
# Row(plant='🥕, Carrot, biennial')
# Row(plant='🍆, Eggplant, perennial')
# Row(plant='🍅, Tomato, annual')
# Row(plant='🥔, Potato, perennial')
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pipeline:
type: chain
transforms:
- type: Create
name: Create produce names
config:
elements:
- season: 'spring'
produce: '🍓'
- season: 'spring'
produce: '🥕'
- season: 'summer'
produce: '🥕'
- season: 'fall'
produce: '🥕'
- season: 'spring'
produce: '🍆'
- season: 'winter'
produce: '🍆'
- season: 'spring'
produce: '🍅'
- season: 'summer'
produce: '🍅'
- season: 'fall'
produce: '🍅'
- season: 'summer'
produce: '🌽'
- type: Combine
name: Shortest names per key
config:
language: python
group_by: season
combine:
produce: count
- type: LogForTesting

options:
yaml_experimental_features: Combine

# Expected:
# Row(season='spring', produce=4)
# Row(season='summer', produce=3)
# Row(season='fall', produce=2)
# Row(season='winter', produce=1)
Loading

0 comments on commit 53e8efa

Please sign in to comment.