Skip to content

Commit

Permalink
add mock for python sdk dependencies and update artifact service method
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Feb 5, 2024
1 parent df68c1d commit 4dfe7e0
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 23 deletions.
13 changes: 4 additions & 9 deletions sdks/python/apache_beam/runners/portability/expansion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
# pytype: skip-file

import copy
import io
import traceback

from apache_beam import pipeline as beam_pipeline
Expand All @@ -31,6 +30,7 @@
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability.artifact_service import BeamFilesystemHandler
from apache_beam.transforms import environments
from apache_beam.transforms import external
from apache_beam.transforms import ptransform
Expand Down Expand Up @@ -132,11 +132,6 @@ def with_pipeline(component, pcoll_id=None):
error=traceback.format_exc())

def artifact_service(self):
return artifact_service.ArtifactRetrievalService(file_reader)


def file_reader(filepath: str) -> io.BytesIO:
"""Reads a file at given path and returns io.BytesIO object"""
with open(filepath, 'rb') as f:
data = f.read()
return io.BytesIO(data)
"""Returns a service to retrieve artifacts for use in a job."""
return artifact_service.ArtifactRetrievalService(
BeamFilesystemHandler(None).file_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.testing.util import equal_to
from apache_beam.transforms import environments
from apache_beam.transforms import userstate
from apache_beam.transforms.environments_test import mock_python_sdk_dependencies

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -308,6 +309,13 @@ def create_options(self):


class PortableRunnerInternalTest(unittest.TestCase):
def setUp(self) -> None:
self.actual_python_sdk_dependencies = environments.python_sdk_dependencies
environments.python_sdk_dependencies = mock_python_sdk_dependencies

def tearDown(self) -> None:
environments.python_sdk_dependencies = self.actual_python_sdk_dependencies

def test__create_default_environment(self):
docker_image = environments.DockerEnvironment.default_docker_image()
self.assertEqual(
Expand Down Expand Up @@ -354,7 +362,9 @@ def test__create_process_environment(self):
'environment_config': '{"command": "run.sh"}',
'sdk_location': 'container',
})),
environments.ProcessEnvironment('run.sh'))
environments.ProcessEnvironment(
'run.sh',
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_external_environment(self):
self.assertEqual(
Expand All @@ -377,7 +387,10 @@ def test__create_external_environment(self):
'sdk_location': 'container',
})),
environments.ExternalEnvironment(
'localhost:50000', params={"k1": "v1"}))
'localhost:50000',
params={"k1": "v1"},
artifacts=environments.python_sdk_dependencies(
PipelineOptions())))
with self.assertRaises(ValueError):
PortableRunner._create_environment(
PipelineOptions.from_dictionary({
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ def _create_stage_submission_env_dependencies(temp_dir):
"""Create and stage a file with list of dependencies installed in the
submission environment.
This staged file is used at runtime to compare the dependencies in the
This list can be used at runtime to compare against the dependencies in the
runtime environment. This allows runners to warn users about any potential
dependency mismatches and help debug issues related to
environment mismatches.
Expand Down
12 changes: 2 additions & 10 deletions sdks/python/apache_beam/transforms/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,9 @@ def __init__(self,
dict(resource_hints) if resource_hints else {})

def __eq__(self, other):
equal_artifacts = True
for first, second in zip(self._artifacts, other._artifacts):
# do not compare type payload since it contains a unique hash.
if (first.type_urn != second.type_urn or
first.role_urn != second.role_urn or
first.role_payload != second.role_payload):
equal_artifacts = False
break

return (
self.__class__ == other.__class__ and equal_artifacts
self.__class__ == other.__class__ and
self._artifacts == other._artifacts
# Assuming that we don't have instances of the same Environment subclass
# with different set of capabilities.
and self._resource_hints == other._resource_hints)
Expand Down
28 changes: 27 additions & 1 deletion sdks/python/apache_beam/transforms/environments_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,39 @@
# pytype: skip-file

import logging
import tempfile
import unittest

from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.portability import common_urns
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import stager
from apache_beam.transforms import environments
from apache_beam.transforms.environments import DockerEnvironment
from apache_beam.transforms.environments import DockerEnvironment, PyPIArtifactRegistry
from apache_beam.transforms.environments import EmbeddedPythonEnvironment
from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment
from apache_beam.transforms.environments import Environment
from apache_beam.transforms.environments import ExternalEnvironment
from apache_beam.transforms.environments import ProcessEnvironment
from apache_beam.transforms.environments import SubprocessSDKEnvironment

# create a temp directory so that all artifacts are put in the same directory.
tmp_dir = tempfile.mkdtemp()


def mock_python_sdk_dependencies(options, tmp_dir=tmp_dir):
skip_prestaged_dependencies = options.view_as(
SetupOptions).prebuild_sdk_container_engine is not None
return stager.Stager.create_job_resources(
options,
tmp_dir,
pypi_requirements=[
artifact[0] + artifact[1]
for artifact in PyPIArtifactRegistry.get_artifacts()
],
skip_prestaged_dependencies=skip_prestaged_dependencies)


class RunnerApiTest(unittest.TestCase):
def test_environment_encoding(self):
Expand Down Expand Up @@ -82,6 +101,13 @@ def test_default_capabilities(self):


class EnvironmentOptionsTest(unittest.TestCase):
def setUp(self) -> None:
self.actual_python_sdk_dependencies = environments.python_sdk_dependencies
environments.python_sdk_dependencies = mock_python_sdk_dependencies

def tearDown(self) -> None:
environments.python_sdk_dependencies = self.actual_python_sdk_dependencies

def test_process_variables_empty(self):
options = PortableOptions([
'--environment_type=PROCESS',
Expand Down

0 comments on commit 4dfe7e0

Please sign in to comment.