Skip to content

Commit

Permalink
[Python] Log dependencies installed in submission environment (apache…
Browse files Browse the repository at this point in the history
…#28564)

* log runtime dependencies

* log submission env dependencies

* rm spare line

* update tests for staged files

* update equals test to ignore artifacts

* handle unit tests, refactor

* unit test sub env staging, convert to string

* change Log to Printf

* change log level to warning

* try env

* add artifact_service method

* correct urn

* fix artifact comparison, file reader

* add mock for python sdk dependencies and update artifact service method

* fix lint

* use magic mock instead of mocking entire function

* update dataflow runner test

* Update sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py

* use debug option to disable

* remove tmp directory mock
  • Loading branch information
riteshghorse authored and hjtran committed Apr 4, 2024
1 parent 8c41bc0 commit ff318f1
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 38 deletions.
23 changes: 16 additions & 7 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

"""Unit tests for the DataflowRunner class."""

# pytype: skip-file

import unittest
Expand Down Expand Up @@ -206,40 +205,50 @@ def test_environment_override_translation_legacy_worker_harness_image(self):
self.default_properties.append('--experiments=beam_fn_api')
self.default_properties.append('--worker_harness_container_image=LEGACY')
remote_runner = DataflowRunner()
with Pipeline(remote_runner,
options=PipelineOptions(self.default_properties)) as p:
options = PipelineOptions(self.default_properties)
options.view_as(DebugOptions).add_experiment(
'disable_logging_submission_environment')
with Pipeline(remote_runner, options=options) as p:
( # pylint: disable=expression-not-assigned
p | ptransform.Create([1, 2, 3])
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())

self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
capabilities=environments.python_sdk_docker_capabilities(),
dependencies=environments.python_sdk_dependencies(
options=options))
])

def test_environment_override_translation_sdk_container_image(self):
self.default_properties.append('--experiments=beam_fn_api')
self.default_properties.append('--sdk_container_image=FOO')
remote_runner = DataflowRunner()
with Pipeline(remote_runner,
options=PipelineOptions(self.default_properties)) as p:
options = PipelineOptions(self.default_properties)
options.view_as(DebugOptions).add_experiment(
'disable_logging_submission_environment')
with Pipeline(remote_runner, options=options) as p:
( # pylint: disable=expression-not-assigned
p | ptransform.Create([1, 2, 3])
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())

self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
capabilities=environments.python_sdk_docker_capabilities(),
dependencies=environments.python_sdk_dependencies(
options=options))
])

def test_remote_runner_translation(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from apache_beam.portability.api import beam_expansion_api_pb2
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability.artifact_service import BeamFilesystemHandler
from apache_beam.transforms import environments
from apache_beam.transforms import external
from apache_beam.transforms import ptransform
Expand Down Expand Up @@ -128,3 +130,8 @@ def with_pipeline(component, pcoll_id=None):
except Exception: # pylint: disable=broad-except
return beam_expansion_api_pb2.ExpansionResponse(
error=traceback.format_exc())

def artifact_service(self):
"""Returns a service to retrieve artifacts for use in a job."""
return artifact_service.ArtifactRetrievalService(
BeamFilesystemHandler(None).file_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import socket
import subprocess
import sys
import tempfile
import time
import unittest
from unittest.mock import MagicMock

import grpc

Expand Down Expand Up @@ -308,12 +310,24 @@ def create_options(self):


class PortableRunnerInternalTest(unittest.TestCase):
def setUp(self) -> None:
self.tmp_dir = tempfile.TemporaryDirectory()
self.actual_mkdtemp = tempfile.mkdtemp
tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name)

def tearDown(self) -> None:
tempfile.mkdtemp = self.actual_mkdtemp
self.tmp_dir.cleanup()

def test__create_default_environment(self):
docker_image = environments.DockerEnvironment.default_docker_image()
self.assertEqual(
PortableRunner._create_environment(
PipelineOptions.from_dictionary({'sdk_location': 'container'})),
environments.DockerEnvironment(container_image=docker_image))
options=PipelineOptions.from_dictionary(
{'sdk_location': 'container'})),
environments.DockerEnvironment(
container_image=docker_image,
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_docker_environment(self):
docker_image = 'py-docker'
Expand All @@ -324,7 +338,9 @@ def test__create_docker_environment(self):
'environment_config': docker_image,
'sdk_location': 'container',
})),
environments.DockerEnvironment(container_image=docker_image))
environments.DockerEnvironment(
container_image=docker_image,
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_process_environment(self):
self.assertEqual(
Expand All @@ -337,15 +353,21 @@ def test__create_process_environment(self):
'sdk_location': 'container',
})),
environments.ProcessEnvironment(
'run.sh', os='linux', arch='amd64', env={'k1': 'v1'}))
'run.sh',
os='linux',
arch='amd64',
env={'k1': 'v1'},
artifacts=environments.python_sdk_dependencies(PipelineOptions())))
self.assertEqual(
PortableRunner._create_environment(
PipelineOptions.from_dictionary({
'environment_type': 'PROCESS',
'environment_config': '{"command": "run.sh"}',
'sdk_location': 'container',
})),
environments.ProcessEnvironment('run.sh'))
environments.ProcessEnvironment(
'run.sh',
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_external_environment(self):
self.assertEqual(
Expand All @@ -355,7 +377,9 @@ def test__create_external_environment(self):
'environment_config': 'localhost:50000',
'sdk_location': 'container',
})),
environments.ExternalEnvironment('localhost:50000'))
environments.ExternalEnvironment(
'localhost:50000',
artifacts=environments.python_sdk_dependencies(PipelineOptions())))
raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
self.assertEqual(
Expand All @@ -366,7 +390,10 @@ def test__create_external_environment(self):
'sdk_location': 'container',
})),
environments.ExternalEnvironment(
'localhost:50000', params={"k1": "v1"}))
'localhost:50000',
params={"k1": "v1"},
artifacts=environments.python_sdk_dependencies(
PipelineOptions())))
with self.assertRaises(ValueError):
PortableRunner._create_environment(
PipelineOptions.from_dictionary({
Expand Down
54 changes: 52 additions & 2 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import logging
import os
import shutil
import subprocess
import sys
import tempfile
from importlib.metadata import distribution
Expand Down Expand Up @@ -84,6 +85,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# Filename that stores the submission environment dependencies.
SUBMISSION_ENV_DEPENDENCIES_FILE = 'submission_environment_dependencies.txt'
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

Expand Down Expand Up @@ -159,9 +162,10 @@ def extract_staging_tuple_iter(
def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
skip_prestaged_dependencies=False, # type: Optional[bool]
log_submission_env_dependencies=True, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand All @@ -183,6 +187,8 @@ def create_job_resources(options, # type: PipelineOptions
cache. Used only for testing.
skip_prestaged_dependencies: Skip staging dependencies that can be
added into SDK containers during prebuilding.
log_submission_env_dependencies: (Optional) param to stage and log
submission environment dependencies. Defaults to True.
Returns:
A list of ArtifactInformation to be used for staging resources.
Expand Down Expand Up @@ -365,6 +371,13 @@ def create_job_resources(options, # type: PipelineOptions
Stager._create_file_stage_to_artifact(
pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))

# stage the submission environment dependencies, if enabled.
if (log_submission_env_dependencies and
not options.view_as(DebugOptions).lookup_experiment(
'disable_logging_submission_environment')):
resources.extend(
Stager._create_stage_submission_env_dependencies(temp_dir))

return resources

def stage_job_resources(self,
Expand Down Expand Up @@ -850,3 +863,40 @@ def _create_beam_sdk(sdk_remote_location, temp_dir):
return [
Stager._create_file_stage_to_artifact(local_download_file, staged_name)
]

@staticmethod
def _create_stage_submission_env_dependencies(temp_dir):
"""Create and stage a file with list of dependencies installed in the
submission environment.
This list can be used at runtime to compare against the dependencies in the
runtime environment. This allows runners to warn users about any potential
dependency mismatches and help debug issues related to
environment mismatches.
Args:
temp_dir: path to temporary location where the file should be
downloaded.
Returns:
A list of ArtifactInformation of local file path that will be staged to
the staging location.
"""
try:
local_dependency_file_path = os.path.join(
temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE)
dependencies = subprocess.check_output(
[sys.executable, '-m', 'pip', 'freeze'])
local_python_path = f"Python Path: {sys.executable}\n"
with open(local_dependency_file_path, 'w') as f:
f.write(local_python_path + str(dependencies))
return [
Stager._create_file_stage_to_artifact(
local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE),
]
except Exception as e:
_LOGGER.warning(
"Couldn't stage a list of installed dependencies in "
"submission environment. Got exception: %s",
e)
return []
Loading

0 comments on commit ff318f1

Please sign in to comment.