diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py new file mode 100644 index 0000000000000..59af385844125 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint: disable=line-too-long + + +def enrichment_with_bigtable(): + # [START enrichment_with_bigtable] + import apache_beam as beam + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler + + project_id = 'apache-beam-testing' + instance_id = 'beam-test' + table_id = 'bigtable-enrichment-test' + row_key = 'product_id' + + data = [ + beam.Row(sale_id=1, customer_id=1, product_id=1, quantity=1), + beam.Row(sale_id=3, customer_id=3, product_id=2, quantity=3), + beam.Row(sale_id=5, customer_id=5, product_id=4, quantity=2) + ] + + bigtable_handler = BigTableEnrichmentHandler( + project_id=project_id, + instance_id=instance_id, + table_id=table_id, + row_key=row_key) + with beam.Pipeline() as p: + _ = ( + p + | "Create" >> beam.Create(data) + | "Enrich W/ BigTable" >> Enrichment(bigtable_handler) + | "Print" >> beam.Map(print)) + # [END enrichment_with_bigtable] diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py new file mode 100644 index 0000000000000..257ce53f8e2a5 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py @@ -0,0 +1,53 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file +# pylint: disable=line-too-long + +import unittest +from io import StringIO + +import mock + +# pylint: disable=unused-import +try: + from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_bigtable + from apache_beam.io.requestresponse import RequestResponseIO +except ImportError: + raise unittest.SkipTest('RequestResponseIO dependencies are not installed') + + +def validate_enrichment_with_bigtable(): + expected = '''[START enrichment_with_bigtable] +Row(sale_id=1, customer_id=1, product_id=1, quantity=1, product={'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'}) +Row(sale_id=3, customer_id=3, product_id=2, quantity=3, product={'product_id': '2', 'product_name': 'pixel 6', 'product_stock': '4'}) +Row(sale_id=5, customer_id=5, product_id=4, quantity=2, product={'product_id': '4', 'product_name': 'pixel 8', 'product_stock': '10'}) + [END enrichment_with_bigtable]'''.splitlines()[1:-1] + return expected + + +@mock.patch('sys.stdout', new_callable=StringIO) +class EnrichmentTest(unittest.TestCase): + def test_enrichment_with_bigtable(self, mock_stdout): + enrichment_with_bigtable() + output = mock_stdout.getvalue().splitlines() + expected = validate_enrichment_with_bigtable() + self.assertEqual(output, expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/ml/overview.md b/website/www/site/content/en/documentation/ml/overview.md index 3a49d40548e8c..772049d718cd0 100644 --- a/website/www/site/content/en/documentation/ml/overview.md +++ b/website/www/site/content/en/documentation/ml/overview.md @@ -90,7 +90,8 @@ You can use Apache Beam for data validation and preprocessing by setting up data | Task | Example | | ------- | ---------------| | I want to transform my data for preprocessing| [Preprocess data with MLTransform](/documentation/ml/preprocess-data) | -| I want to explore my data | [Data exploration workflow and example](/documentation/ml/data-processing) |: +| I want to explore my data | [Data exploration workflow and example](/documentation/ml/data-processing) | +| I want to enrich my data | [Data enrichment wth Enrichment transform](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/bigtable_enrichment_transform.ipynb) |: {{< /table >}} diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md new file mode 100644 index 0000000000000..5dfa5df04fae0 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md @@ -0,0 +1,66 @@ +--- +title: "Enrichment" +--- + + +# Enrichment transform + +{{< localstorage language language-py >}} + + + + + +
+ + {{< button-pydoc path="apache_beam.transforms" class="Enrichment" >}} + +
+ + +The enrichment transform lets you dynamically enrich data in a pipeline by doing a key-value lookup to a remote service. The transform uses [`RequestResponeIO`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.requestresponseio.html#apache_beam.io.requestresponseio.RequestResponseIO) internally. This feature uses client-side throttling to ensure that the remote service isn't overloaded with requests. If service-side errors occur, like `TooManyRequests` and `Timeout` exceptions, it retries the requests by using exponential backoff. + +In Apache Beam 2.54.0 and later versions, the transform includes a built-in enrichment handler for [Bigtable](https://cloud.google.com/bigtable/docs/overview). + +## Use Bigtable to enrich data + +The following example demonstrates how to create a pipeline that use the enrichment transform with [`BigTableEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.BigTableEnrichmentHandler). + +The data stored in the Bigtable cluster uses the following format: + +| Row key | product:product_id | product:product_name | product:product_stock | +|:---------:|:--------------------:|:----------------------:|:-----------------------:| +| 1 | 1 | pixel 5 | 2 | +| 2 | 2 | pixel 6 | 4 | +| 3 | 3 | pixel 7 | 20 | +| 4 | 4 | pixel 8 | 10 | + + +{{< highlight language="py" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_bigtable >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_bigtable >}} +{{< /highlight >}} + +## Related transforms + +Not applicable. + +{{< button-pydoc path="apache_beam.transforms" class="Enrichment" >}} \ No newline at end of file diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/runinference-pytorch.md b/website/www/site/content/en/documentation/transforms/python/elementwise/runinference-pytorch.md index f0bfae7f4cc36..258c4c82d316b 100644 --- a/website/www/site/content/en/documentation/transforms/python/elementwise/runinference-pytorch.md +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/runinference-pytorch.md @@ -27,7 +27,7 @@ limitations under the License. -The following examples demonstrate how to to create pipelines that use the Beam RunInference API and PyTorch. +The following examples demonstrate how to create pipelines that use the Beam RunInference API and PyTorch. ## Example 1: PyTorch unkeyed model diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index 666b7c95f0803..a8982fa9798ed 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -21,6 +21,7 @@ limitations under the License. + diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index dcf0b857bf976..ad6af867c1b25 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -288,6 +288,7 @@ Element-wise
TransformDescription
EnrichmentPerforms data enrichment with a remote service.
FilterGiven a predicate, filter out all elements that don't satisfy the predicate.
FlatMapApplies a function that returns a collection to every element in the input and outputs all resulting elements.