From e441b5251d6509fc878c222177e2fa2e58bc990d Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 19 Sep 2023 16:30:06 -0400 Subject: [PATCH 01/20] log runtime dependencies --- sdks/python/container/boot.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index ded10a44204a6..ccd23db7e4968 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -400,6 +400,7 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str // folder that is mapped to the host (and therefore survives restarts). for _, f := range requirementsFiles { if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil { + return fmt.Errorf("failed to install requirements: %v", err) } } @@ -409,7 +410,25 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil { return fmt.Errorf("failed to install workflow: %v", err) } + if err := logRuntimeDependencies(ctx, logger); err != nil { + logger.Warnf(ctx, "couldn't fetch the runtime dependencies: %v", err) + } + + return nil +} +func logRuntimeDependencies(ctx context.Context, logger *tools.Logger) error { + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + return err + } + args := []string{"-m", "pip", "freeze"} + bufLogger := tools.NewBufferedLogger(logger) + if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { + bufLogger.FlushAtError(ctx) + } else { + bufLogger.FlushAtDebug(ctx) + } return nil } From 14974278b2d11230821e366edf77b3c1156c02d9 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 20 Sep 2023 13:43:45 -0400 Subject: [PATCH 02/20] log submission env dependencies --- .../python/apache_beam/runners/internal/names.py | 3 +++ .../apache_beam/runners/portability/stager.py | 12 ++++++++++++ sdks/python/container/boot.go | 16 ++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/sdks/python/apache_beam/runners/internal/names.py b/sdks/python/apache_beam/runners/internal/names.py index ff6010d3ad469..be7ff0b76f478 100644 --- a/sdks/python/apache_beam/runners/internal/names.py +++ b/sdks/python/apache_beam/runners/internal/names.py @@ -33,3 +33,6 @@ # SDK identifiers for different distributions BEAM_SDK_NAME = 'Apache Beam SDK for Python' + +# Filename that stores the submission environment dependencies. +SUBMISSION_ENV_DEPENDENCIES_FILENAME = 'submission_environment_dependencies.txt' diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 1f093b1d7bc3e..a70ed5c839a5d 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -55,6 +55,7 @@ import sys import tempfile from importlib.metadata import distribution +from pip._internal.operations import freeze from typing import Callable from typing import List from typing import Optional @@ -365,6 +366,17 @@ def create_job_resources(options, # type: PipelineOptions Stager._create_file_stage_to_artifact( pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) + # stage the submission environment dependencies + local_dependency_file_path = os.path.join( + temp_dir, names.SUBMISSION_ENV_DEPENDENCIES_FILENAME) + dependencies = freeze.freeze() + with open(local_dependency_file_path, 'w') as f: + f.write('\n'.join(dependencies)) + resources.append( + Stager._create_file_stage_to_artifact( + local_dependency_file_path, + names.SUBMISSION_ENV_DEPENDENCIES_FILENAME)) + return resources def stage_job_resources(self, diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index ccd23db7e4968..bfacc11a4c132 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -38,6 +38,7 @@ import ( "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" @@ -413,11 +414,26 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str if err := logRuntimeDependencies(ctx, logger); err != nil { logger.Warnf(ctx, "couldn't fetch the runtime dependencies: %v", err) } + if err := logSubmissionEnvDependencies(ctx, logger, workDir); err != nil { + logger.Warnf(ctx, "couldn't fetch the submission environment dependencies: %v", err) + } + + return nil +} +func logSubmissionEnvDependencies(ctx context.Context, logger *tools.Logger, dir string) error { + logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, "Logging submission environment dependencies:") + filename := filepath.Join(dir, "submission_environment_dependencies.txt") + content, err := os.ReadFile(filename) + if err != nil { + return err + } + logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, string(content)) return nil } func logRuntimeDependencies(ctx context.Context, logger *tools.Logger) error { + logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, "Logging runtime dependencies:") pythonVersion, err := expansionx.GetPythonVersion() if err != nil { return err From b696f6f7610a550faf3024bdcb99b9804f07a5bf Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 20 Sep 2023 13:56:33 -0400 Subject: [PATCH 03/20] rm spare line --- sdks/python/container/boot.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index bfacc11a4c132..4e8482d2f6af8 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -401,7 +401,6 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str // folder that is mapped to the host (and therefore survives restarts). for _, f := range requirementsFiles { if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil { - return fmt.Errorf("failed to install requirements: %v", err) } } From 8cbeb61a75be8d8d96a637992707a27f53d279b6 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 20 Sep 2023 15:14:10 -0400 Subject: [PATCH 04/20] update tests for staged files --- .../apache_beam/runners/internal/names.py | 3 - .../apache_beam/runners/portability/stager.py | 9 +- .../runners/portability/stager_test.py | 94 ++++++++++++++----- 3 files changed, 74 insertions(+), 32 deletions(-) diff --git a/sdks/python/apache_beam/runners/internal/names.py b/sdks/python/apache_beam/runners/internal/names.py index be7ff0b76f478..ff6010d3ad469 100644 --- a/sdks/python/apache_beam/runners/internal/names.py +++ b/sdks/python/apache_beam/runners/internal/names.py @@ -33,6 +33,3 @@ # SDK identifiers for different distributions BEAM_SDK_NAME = 'Apache Beam SDK for Python' - -# Filename that stores the submission environment dependencies. -SUBMISSION_ENV_DEPENDENCIES_FILENAME = 'submission_environment_dependencies.txt' diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index a70ed5c839a5d..66d3b72b9ef48 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -55,7 +55,6 @@ import sys import tempfile from importlib.metadata import distribution -from pip._internal.operations import freeze from typing import Callable from typing import List from typing import Optional @@ -63,6 +62,7 @@ from urllib.parse import urlparse from packaging import version +from pip._internal.operations import freeze from apache_beam.internal import pickler from apache_beam.internal.http_client import get_new_http @@ -85,6 +85,8 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' REQUIREMENTS_FILE = 'requirements.txt' EXTRA_PACKAGES_FILE = 'extra_packages.txt' +# Filename that stores the submission environment dependencies. +SUBMISSION_ENV_DEPENDENCIES_FILENAME = 'submission_environment_dependencies.txt' # One of the choices for user to use for requirements cache during staging SKIP_REQUIREMENTS_CACHE = 'skip' @@ -368,14 +370,13 @@ def create_job_resources(options, # type: PipelineOptions # stage the submission environment dependencies local_dependency_file_path = os.path.join( - temp_dir, names.SUBMISSION_ENV_DEPENDENCIES_FILENAME) + temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILENAME) dependencies = freeze.freeze() with open(local_dependency_file_path, 'w') as f: f.write('\n'.join(dependencies)) resources.append( Stager._create_file_stage_to_artifact( - local_dependency_file_path, - names.SUBMISSION_ENV_DEPENDENCIES_FILENAME)) + local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILENAME)) return resources diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 839f7e577733e..231b1eba0b228 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -162,7 +162,7 @@ def test_no_main_session(self): options.view_as(SetupOptions).save_main_session = False self.update_options(options) - self.assertEqual([], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -180,7 +180,10 @@ def test_with_main_session(self): options.view_as(SetupOptions).pickle_library = pickler.USE_DILL self.update_options(options) - self.assertEqual([names.PICKLED_MAIN_SESSION_FILE], + self.assertEqual([ + names.PICKLED_MAIN_SESSION_FILE, + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) self.assertTrue( @@ -199,7 +202,7 @@ def test_main_session_not_staged_when_using_cloudpickle(self): # session is saved when pickle_library==cloudpickle. options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE self.update_options(options) - self.assertEqual([], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -208,7 +211,7 @@ def test_default_resources(self): options = PipelineOptions() self.update_options(options) - self.assertEqual([], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -225,7 +228,12 @@ def test_with_requirements_file(self): self.create_temp_file( os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing') self.assertEqual( - sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']), + sorted([ + stager.REQUIREMENTS_FILE, + 'abc.txt', + 'def.txt', + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ]), sorted( self.stager.create_and_stage_job_resources( options, @@ -246,9 +254,12 @@ def test_with_pypi_requirements(self): pypi_requirements=['nothing>=1.0,<2.0'], populate_requirements_cache=self.populate_requirements_cache, staging_location=staging_dir)[1] - self.assertEqual(3, len(resources)) + self.assertEqual(4, len(resources)) self.assertTrue({'abc.txt', 'def.txt'} <= set(resources)) - generated_requirements = (set(resources) - {'abc.txt', 'def.txt'}).pop() + generated_requirements = ( + set(resources) - + {'abc.txt', 'def.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + }).pop() with open(os.path.join(staging_dir, generated_requirements)) as f: data = f.read() self.assertEqual('nothing>=1.0,<2.0', data) @@ -282,7 +293,12 @@ def test_with_requirements_file_and_cache(self): self.create_temp_file( os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing') self.assertEqual( - sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']), + sorted([ + stager.REQUIREMENTS_FILE, + 'abc.txt', + 'def.txt', + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ]), sorted( self.stager.create_and_stage_job_resources( options, @@ -313,7 +329,10 @@ def test_requirements_cache_not_populated_when_cache_disabled(self): populate_requirements_cache=self.populate_requirements_cache, staging_location=staging_dir)[1] assert not populate_requirements_cache.called - self.assertEqual([stager.REQUIREMENTS_FILE], resources) + self.assertEqual([ + stager.REQUIREMENTS_FILE, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], + resources) self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt'))) @@ -378,7 +397,8 @@ def test_sdk_location_default(self): _, staged_resources = self.stager.create_and_stage_job_resources( options, temp_dir=self.make_temp_dir(), staging_location=staging_dir) - self.assertEqual([], staged_resources) + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + staged_resources) def test_sdk_location_local_directory(self): staging_dir = self.make_temp_dir() @@ -391,7 +411,10 @@ def test_sdk_location_local_directory(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME) @@ -409,7 +432,10 @@ def test_sdk_location_local_source_file(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME) @@ -427,9 +453,10 @@ def test_sdk_location_local_wheel_file(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([sdk_filename], - self.stager.create_and_stage_job_resources( - options, staging_location=staging_dir)[1]) + self.assertEqual( + [sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, sdk_filename) with open(tarball_path) as f: self.assertEqual(f.read(), 'Package content.') @@ -463,7 +490,10 @@ def test_sdk_location_remote_source_file(self, *unused_mocks): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -485,9 +515,10 @@ def file_download(_, to_path): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._download_file', staticmethod(file_download)): - self.assertEqual([sdk_filename], - self.stager.create_and_stage_job_resources( - options, staging_location=staging_dir)[1]) + self.assertEqual( + [sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1]) wheel_file_path = os.path.join(staging_dir, sdk_filename) with open(wheel_file_path) as f: @@ -509,7 +540,10 @@ def file_download(_, to_path): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._download_file', staticmethod(file_download)): - self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], + self.assertEqual([ + names.STAGED_SDK_SOURCES_FILENAME, + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -551,7 +585,8 @@ def test_with_extra_packages(self): 'xyz2.tar', 'whl.whl', 'remote_file.tar.gz', - stager.EXTRA_PACKAGES_FILE + stager.EXTRA_PACKAGES_FILE, + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -659,7 +694,13 @@ def test_with_jar_packages(self): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._is_remote_path', staticmethod(self.is_remote_path)): - self.assertEqual(['abc.jar', 'xyz.jar', 'ijk.jar', 'remote.jar'], + self.assertEqual([ + 'abc.jar', + 'xyz.jar', + 'ijk.jar', + 'remote.jar', + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files) @@ -719,7 +760,8 @@ def test_populate_requirements_cache_with_bdist(self): resources = self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1] for f in resources: - if f != stager.REQUIREMENTS_FILE: + if (f != stager.REQUIREMENTS_FILE and + f != stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME): self.assertTrue(('.tar.gz' in f) or ('.whl' in f)) # requirements cache will populated only with sdists/sources @@ -744,7 +786,8 @@ def test_populate_requirements_cache_with_sdist(self): options, staging_location=staging_dir)[1] for f in resources: - if f != stager.REQUIREMENTS_FILE: + if (f != stager.REQUIREMENTS_FILE and + f != stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME): self.assertTrue('.tar.gz' in f) self.assertTrue('.whl' not in f) @@ -777,7 +820,8 @@ def test_populate_requirements_cache_with_local_files(self): stager.REQUIREMENTS_FILE, stager.EXTRA_PACKAGES_FILE, 'nothing.tar.gz', - 'local_package.tar.gz' + 'local_package.tar.gz', + stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME ]), sorted(resources)) From 1e68428a9e03fbe793ecffb1aa695c281406d71e Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 21 Sep 2023 11:53:08 -0400 Subject: [PATCH 05/20] update equals test to ignore artifacts --- sdks/python/apache_beam/transforms/environments.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 109fcb825347a..d392e6157b295 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -124,9 +124,9 @@ def __init__(self, dict(resource_hints) if resource_hints else {}) def __eq__(self, other): + # don't compare artifacts since they have different hashes in their names. return ( - self.__class__ == other.__class__ and - self._artifacts == other._artifacts + self.__class__ == other.__class__ # Assuming that we don't have instances of the same Environment subclass # with different set of capabilities. and self._resource_hints == other._resource_hints) From 284f77261aa43ca27d5e811b3b2dd520fb6f0c1f Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 22 Sep 2023 00:55:35 -0400 Subject: [PATCH 06/20] handle unit tests, refactor --- .../runners/dataflow/dataflow_runner_test.py | 32 +++++------ .../apache_beam/runners/portability/stager.py | 33 +++++++---- .../runners/portability/stager_test.py | 56 +++++++++---------- .../apache_beam/transforms/environments.py | 13 +++-- 4 files changed, 70 insertions(+), 64 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b58531acc6a97..b9263a690a36b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -211,15 +211,13 @@ def test_environment_override_translation_legacy_worker_harness_image(self): p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - self.assertEqual( - list(remote_runner.proto_pipeline.components.environments.values()), - [ - beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=beam_runner_api_pb2.DockerPayload( - container_image='LEGACY').SerializeToString(), - capabilities=environments.python_sdk_docker_capabilities()) - ]) + first = remote_runner.proto_pipeline.components.environments.values() + second = beam_runner_api_pb2.Environment( + urn=common_urns.environments.DOCKER.urn, + payload=beam_runner_api_pb2.DockerPayload( + container_image='LEGACY').SerializeToString(), + capabilities=environments.python_sdk_docker_capabilities()) + self.assertTrue(first.__eq__(second)) def test_environment_override_translation_sdk_container_image(self): self.default_properties.append('--experiments=beam_fn_api') @@ -231,15 +229,13 @@ def test_environment_override_translation_sdk_container_image(self): p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - self.assertEqual( - list(remote_runner.proto_pipeline.components.environments.values()), - [ - beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=beam_runner_api_pb2.DockerPayload( - container_image='FOO').SerializeToString(), - capabilities=environments.python_sdk_docker_capabilities()) - ]) + first = remote_runner.proto_pipeline.components.environments.values() + second = beam_runner_api_pb2.Environment( + urn=common_urns.environments.DOCKER.urn, + payload=beam_runner_api_pb2.DockerPayload( + container_image='FOO').SerializeToString(), + capabilities=environments.python_sdk_docker_capabilities()) + self.assertTrue(first.__eq__(second)) def test_remote_runner_translation(self): remote_runner = DataflowRunner() diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 66d3b72b9ef48..0dc2decb48f09 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -52,6 +52,7 @@ import logging import os import shutil +import subprocess import sys import tempfile from importlib.metadata import distribution @@ -62,7 +63,6 @@ from urllib.parse import urlparse from packaging import version -from pip._internal.operations import freeze from apache_beam.internal import pickler from apache_beam.internal.http_client import get_new_http @@ -86,7 +86,7 @@ REQUIREMENTS_FILE = 'requirements.txt' EXTRA_PACKAGES_FILE = 'extra_packages.txt' # Filename that stores the submission environment dependencies. -SUBMISSION_ENV_DEPENDENCIES_FILENAME = 'submission_environment_dependencies.txt' +SUBMISSION_ENV_DEPENDENCIES_FILE = 'submission_environment_dependencies.txt' # One of the choices for user to use for requirements cache during staging SKIP_REQUIREMENTS_CACHE = 'skip' @@ -165,6 +165,7 @@ def create_job_resources(options, # type: PipelineOptions pypi_requirements=None, # type: Optional[List[str]] populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]] skip_prestaged_dependencies=False, # type: Optional[bool] + log_submission_env_dependencies=True, # type: Optional[bool] ): """For internal use only; no backwards-compatibility guarantees. @@ -369,14 +370,9 @@ def create_job_resources(options, # type: PipelineOptions pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) # stage the submission environment dependencies - local_dependency_file_path = os.path.join( - temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILENAME) - dependencies = freeze.freeze() - with open(local_dependency_file_path, 'w') as f: - f.write('\n'.join(dependencies)) - resources.append( - Stager._create_file_stage_to_artifact( - local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILENAME)) + if log_submission_env_dependencies: + resources.extend( + Stager._create_stage_submission_env_dependencies(temp_dir)) return resources @@ -850,3 +846,20 @@ def _create_beam_sdk(sdk_remote_location, temp_dir): return [ Stager._create_file_stage_to_artifact(local_download_file, staged_name) ] + + @staticmethod + def _create_stage_submission_env_dependencies(temp_dir): + try: + local_dependency_file_path = os.path.join( + temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) + dependencies = subprocess.check_output( + [sys.executable, '-m', 'pip', 'freeze']) + with open(local_dependency_file_path, 'w') as f: + f.write('\n'.join(dependencies)) + return [ + Stager._create_file_stage_to_artifact( + local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE) + ] + except Exception: + _LOGGER.debug("couldn't stage dependencies from submission environment") + return [] diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 231b1eba0b228..f85cb3b79335c 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -162,7 +162,7 @@ def test_no_main_session(self): options.view_as(SetupOptions).save_main_session = False self.update_options(options) - self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -182,7 +182,7 @@ def test_with_main_session(self): self.assertEqual([ names.PICKLED_MAIN_SESSION_FILE, - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -202,7 +202,7 @@ def test_main_session_not_staged_when_using_cloudpickle(self): # session is saved when pickle_library==cloudpickle. options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE self.update_options(options) - self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -211,7 +211,7 @@ def test_default_resources(self): options = PipelineOptions() self.update_options(options) - self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -232,7 +232,7 @@ def test_with_requirements_file(self): stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt', - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ]), sorted( self.stager.create_and_stage_job_resources( @@ -258,8 +258,7 @@ def test_with_pypi_requirements(self): self.assertTrue({'abc.txt', 'def.txt'} <= set(resources)) generated_requirements = ( set(resources) - - {'abc.txt', 'def.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME - }).pop() + {'abc.txt', 'def.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILE}).pop() with open(os.path.join(staging_dir, generated_requirements)) as f: data = f.read() self.assertEqual('nothing>=1.0,<2.0', data) @@ -297,7 +296,7 @@ def test_with_requirements_file_and_cache(self): stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt', - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ]), sorted( self.stager.create_and_stage_job_resources( @@ -329,10 +328,9 @@ def test_requirements_cache_not_populated_when_cache_disabled(self): populate_requirements_cache=self.populate_requirements_cache, staging_location=staging_dir)[1] assert not populate_requirements_cache.called - self.assertEqual([ - stager.REQUIREMENTS_FILE, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME - ], - resources) + self.assertEqual( + [stager.REQUIREMENTS_FILE, stager.SUBMISSION_ENV_DEPENDENCIES_FILE], + resources) self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt'))) @@ -397,7 +395,7 @@ def test_sdk_location_default(self): _, staged_resources = self.stager.create_and_stage_job_resources( options, temp_dir=self.make_temp_dir(), staging_location=staging_dir) - self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], + self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE], staged_resources) def test_sdk_location_local_directory(self): @@ -413,7 +411,7 @@ def test_sdk_location_local_directory(self): self.assertEqual([ names.STAGED_SDK_SOURCES_FILENAME, - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -434,7 +432,7 @@ def test_sdk_location_local_source_file(self): self.assertEqual([ names.STAGED_SDK_SOURCES_FILENAME, - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -453,10 +451,9 @@ def test_sdk_location_local_wheel_file(self): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual( - [sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], - self.stager.create_and_stage_job_resources( - options, staging_location=staging_dir)[1]) + self.assertEqual([sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILE], + self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1]) tarball_path = os.path.join(staging_dir, sdk_filename) with open(tarball_path) as f: self.assertEqual(f.read(), 'Package content.') @@ -492,7 +489,7 @@ def test_sdk_location_remote_source_file(self, *unused_mocks): self.assertEqual([ names.STAGED_SDK_SOURCES_FILENAME, - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -515,10 +512,9 @@ def file_download(_, to_path): with mock.patch('apache_beam.runners.portability.stager_test' '.stager.Stager._download_file', staticmethod(file_download)): - self.assertEqual( - [sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME], - self.stager.create_and_stage_job_resources( - options, staging_location=staging_dir)[1]) + self.assertEqual([sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILE], + self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1]) wheel_file_path = os.path.join(staging_dir, sdk_filename) with open(wheel_file_path) as f: @@ -542,7 +538,7 @@ def file_download(_, to_path): staticmethod(file_download)): self.assertEqual([ names.STAGED_SDK_SOURCES_FILENAME, - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -586,7 +582,7 @@ def test_with_extra_packages(self): 'whl.whl', 'remote_file.tar.gz', stager.EXTRA_PACKAGES_FILE, - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -699,7 +695,7 @@ def test_with_jar_packages(self): 'xyz.jar', 'ijk.jar', 'remote.jar', - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ], self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) @@ -761,7 +757,7 @@ def test_populate_requirements_cache_with_bdist(self): options, staging_location=staging_dir)[1] for f in resources: if (f != stager.REQUIREMENTS_FILE and - f != stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME): + f != stager.SUBMISSION_ENV_DEPENDENCIES_FILE): self.assertTrue(('.tar.gz' in f) or ('.whl' in f)) # requirements cache will populated only with sdists/sources @@ -787,7 +783,7 @@ def test_populate_requirements_cache_with_sdist(self): for f in resources: if (f != stager.REQUIREMENTS_FILE and - f != stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME): + f != stager.SUBMISSION_ENV_DEPENDENCIES_FILE): self.assertTrue('.tar.gz' in f) self.assertTrue('.whl' not in f) @@ -821,7 +817,7 @@ def test_populate_requirements_cache_with_local_files(self): stager.EXTRA_PACKAGES_FILE, 'nothing.tar.gz', 'local_package.tar.gz', - stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME + stager.SUBMISSION_ENV_DEPENDENCIES_FILE ]), sorted(resources)) diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index d392e6157b295..3d5810a95fe44 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -580,7 +580,7 @@ def from_options(cls, options): url, params=params, capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options), + artifacts=python_sdk_dependencies(options, logDependencies=False), resource_hints=resource_hints_from_options(options)) @@ -605,7 +605,7 @@ def from_options(cls, options): # type: (PortableOptions) -> EmbeddedPythonEnvironment return cls( capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options), + artifacts=python_sdk_dependencies(options, logDependencies=False), resource_hints=resource_hints_from_options(options), ) @@ -692,11 +692,11 @@ def from_options(cls, options): state_cache_size=config.get('state_cache_size'), data_buffer_time_limit_ms=config.get('data_buffer_time_limit_ms'), capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options, logDependencies=False)) else: return cls( capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options), + artifacts=python_sdk_dependencies(options, logDependencies=False), resource_hints=resource_hints_from_options(options)) @staticmethod @@ -838,7 +838,7 @@ def _python_sdk_capabilities_iter(): yield common_urns.protocols.DATA_SAMPLING.urn -def python_sdk_dependencies(options, tmp_dir=None): +def python_sdk_dependencies(options, tmp_dir=None, logDependencies=True): if tmp_dir is None: tmp_dir = tempfile.mkdtemp() skip_prestaged_dependencies = options.view_as( @@ -850,4 +850,5 @@ def python_sdk_dependencies(options, tmp_dir=None): artifact[0] + artifact[1] for artifact in PyPIArtifactRegistry.get_artifacts() ], - skip_prestaged_dependencies=skip_prestaged_dependencies) + skip_prestaged_dependencies=skip_prestaged_dependencies, + log_submission_env_dependencies=logDependencies) From 8f032517e345a2747888007ab48e16ecab5af92d Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 22 Sep 2023 10:30:31 -0400 Subject: [PATCH 07/20] unit test sub env staging, convert to string --- sdks/python/apache_beam/runners/portability/stager.py | 2 +- .../apache_beam/runners/portability/stager_test.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 0dc2decb48f09..a239134109c54 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -855,7 +855,7 @@ def _create_stage_submission_env_dependencies(temp_dir): dependencies = subprocess.check_output( [sys.executable, '-m', 'pip', 'freeze']) with open(local_dependency_file_path, 'w') as f: - f.write('\n'.join(dependencies)) + f.write(str(dependencies)) return [ Stager._create_file_stage_to_artifact( local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE) diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index f85cb3b79335c..25fd62b16533c 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -215,6 +215,15 @@ def test_default_resources(self): self.stager.create_and_stage_job_resources( options, staging_location=staging_dir)[1]) + def test_no_submission_env_staging(self): + staging_dir = self.make_temp_dir() + options = PipelineOptions() + self.update_options(options) + + resources = self.stager.create_job_resources( + options, staging_dir, log_submission_env_dependencies=False) + self.assertEqual([], resources) + def test_with_requirements_file(self): staging_dir = self.make_temp_dir() requirements_cache_dir = self.make_temp_dir() From 7e6e62a11dea5466ca832ec55d93607cf788def7 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 26 Sep 2023 13:43:28 -0400 Subject: [PATCH 08/20] change Log to Printf --- sdks/python/container/boot.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 4e8482d2f6af8..882106b2572d4 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -38,7 +38,6 @@ import ( "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" @@ -421,18 +420,18 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str } func logSubmissionEnvDependencies(ctx context.Context, logger *tools.Logger, dir string) error { - logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, "Logging submission environment dependencies:") + logger.Printf(ctx, "Logging submission environment dependencies:") filename := filepath.Join(dir, "submission_environment_dependencies.txt") content, err := os.ReadFile(filename) if err != nil { return err } - logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, string(content)) + logger.Printf(ctx, string(content)) return nil } func logRuntimeDependencies(ctx context.Context, logger *tools.Logger) error { - logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, "Logging runtime dependencies:") + logger.Printf(ctx, "Logging runtime dependencies:") pythonVersion, err := expansionx.GetPythonVersion() if err != nil { return err From cc24a1d82681c8ce150ecd116c2f273ca80462ea Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 3 Oct 2023 15:19:27 -0400 Subject: [PATCH 09/20] change log level to warning --- .../apache_beam/runners/portability/stager.py | 23 +++++++++++++++++-- .../apache_beam/transforms/environments.py | 4 ++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index a239134109c54..6ead577d6344d 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -849,6 +849,22 @@ def _create_beam_sdk(sdk_remote_location, temp_dir): @staticmethod def _create_stage_submission_env_dependencies(temp_dir): + """Create and stage a file with list of dependencies installed in the + submission environment. + + This staged file is used at runtime to compare the dependencies in the + runtime environment. This allows runners to warn users about any potential + dependency mismatches and help debug issues related to + environment mismatches. + + Args: + temp_dir: path to temporary location where the file should be + downloaded. + + Returns: + A list of ArtifactInformation of local file path that will be staged to + the staging location. + """ try: local_dependency_file_path = os.path.join( temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) @@ -860,6 +876,9 @@ def _create_stage_submission_env_dependencies(temp_dir): Stager._create_file_stage_to_artifact( local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE) ] - except Exception: - _LOGGER.debug("couldn't stage dependencies from submission environment") + except Exception as e: + _LOGGER.warning( + "Couldn't stage a list of installed dependencies in " + "submission environment. Got exception: %s", + e) return [] diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 3d5810a95fe44..0238ca9138e52 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -124,9 +124,9 @@ def __init__(self, dict(resource_hints) if resource_hints else {}) def __eq__(self, other): - # don't compare artifacts since they have different hashes in their names. return ( - self.__class__ == other.__class__ + self.__class__ == other.__class__ and + self._artifacts == other._artifacts # Assuming that we don't have instances of the same Environment subclass # with different set of capabilities. and self._resource_hints == other._resource_hints) From 6b6b2227c7725560899a78fa8d9a4ced599bf95a Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 6 Dec 2023 12:05:55 -0500 Subject: [PATCH 10/20] try env --- .../apache_beam/runners/portability/stager.py | 19 +++++++++++-------- sdks/python/container/boot.go | 14 ++++++++------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 6ead577d6344d..dd0efbad6efde 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -866,16 +866,19 @@ def _create_stage_submission_env_dependencies(temp_dir): the staging location. """ try: - local_dependency_file_path = os.path.join( - temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) + # local_dependency_file_path = os.path.join( + # temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) dependencies = subprocess.check_output( [sys.executable, '-m', 'pip', 'freeze']) - with open(local_dependency_file_path, 'w') as f: - f.write(str(dependencies)) - return [ - Stager._create_file_stage_to_artifact( - local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE) - ] + + os.environ.setdefault('SUBMISSION_DEPENDENCIES', str(dependencies)) + return [] + # with open(local_dependency_file_path, 'w') as f: + # f.write(str(dependencies)) + # return [ + # Stager._create_file_stage_to_artifact( + # local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE) + # ] except Exception as e: _LOGGER.warning( "Couldn't stage a list of installed dependencies in " diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 882106b2572d4..7f62b0c96d181 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -421,12 +421,14 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str func logSubmissionEnvDependencies(ctx context.Context, logger *tools.Logger, dir string) error { logger.Printf(ctx, "Logging submission environment dependencies:") - filename := filepath.Join(dir, "submission_environment_dependencies.txt") - content, err := os.ReadFile(filename) - if err != nil { - return err - } - logger.Printf(ctx, string(content)) +// filename := filepath.Join(dir, "submission_environment_dependencies.txt") +// content, err := os.ReadFile(filename) +// if err != nil { +// return err +// } +// logger.Printf(ctx, string(content)) + dependencies = os.Getenv("SUBMISSION_DEPENDENCIES") + logger.Printf(ctx, dependencies) return nil } From 9b65385879cc41c07a20bb8499b37045b577897f Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 20 Dec 2023 10:38:52 -0500 Subject: [PATCH 11/20] add artifact_service method --- .../runners/portability/expansion_service.py | 4 ++ .../apache_beam/runners/portability/stager.py | 31 +++++------- .../apache_beam/transforms/environments.py | 13 +++-- sdks/python/container/boot.go | 48 ++++++++++--------- 4 files changed, 49 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index c7728098f30c5..50d727830de01 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -29,6 +29,7 @@ from apache_beam.portability.api import beam_expansion_api_pb2 from apache_beam.portability.api import beam_expansion_api_pb2_grpc from apache_beam.runners import pipeline_context +from apache_beam.runners.portability import artifact_service from apache_beam.transforms import environments from apache_beam.transforms import external from apache_beam.transforms import ptransform @@ -128,3 +129,6 @@ def with_pipeline(component, pcoll_id=None): except Exception: # pylint: disable=broad-except return beam_expansion_api_pb2.ExpansionResponse( error=traceback.format_exc()) + + def artifact_service(self): + return artifact_service.ArtifactRetrievalService(None) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 9ba3a81f46fe9..2c61ac931c171 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -121,7 +121,7 @@ def commit_manifest(self): @staticmethod def _create_file_stage_to_artifact(local_path, staged_name): return beam_runner_api_pb2.ArtifactInformation( - type_urn=common_urns.artifact_types.FILE.urn, + type_urn=common_urns.artifact_types.urn, type_payload=beam_runner_api_pb2.ArtifactFilePayload( path=local_path).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, @@ -162,10 +162,9 @@ def extract_staging_tuple_iter( def create_job_resources(options, # type: PipelineOptions temp_dir, # type: str build_setup_args=None, # type: Optional[List[str]] - pypi_requirements=None, # type: Optional[List[str]] + pypi_requirements=None, # type: Optional[List[str]] populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]] - skip_prestaged_dependencies=False, # type: Optional[bool] - log_submission_env_dependencies=True, # type: Optional[bool] + skip_prestaged_dependencies=False, # type: Optional[bool] ): """For internal use only; no backwards-compatibility guarantees. @@ -370,9 +369,7 @@ def create_job_resources(options, # type: PipelineOptions pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) # stage the submission environment dependencies - if log_submission_env_dependencies: - resources.extend( - Stager._create_stage_submission_env_dependencies(temp_dir)) + resources.extend(Stager._create_stage_submission_env_dependencies(temp_dir)) return resources @@ -879,19 +876,17 @@ def _create_stage_submission_env_dependencies(temp_dir): the staging location. """ try: - # local_dependency_file_path = os.path.join( - # temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) + local_dependency_file_path = os.path.join( + temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE) dependencies = subprocess.check_output( [sys.executable, '-m', 'pip', 'freeze']) - - os.environ.setdefault('SUBMISSION_DEPENDENCIES', str(dependencies)) - return [] - # with open(local_dependency_file_path, 'w') as f: - # f.write(str(dependencies)) - # return [ - # Stager._create_file_stage_to_artifact( - # local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE) - # ] + local_python_path = f"Python Path: {sys.executable}\n" + with open(local_dependency_file_path, 'w') as f: + f.write(local_python_path + str(dependencies)) + return [ + Stager._create_file_stage_to_artifact( + local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE), + ] except Exception as e: _LOGGER.warning( "Couldn't stage a list of installed dependencies in " diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 3966dea7abee9..b2fbe87a729ab 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -581,7 +581,7 @@ def from_options(cls, options): url, params=params, capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options, logDependencies=False), + artifacts=python_sdk_dependencies(options), resource_hints=resource_hints_from_options(options)) @@ -624,7 +624,7 @@ def from_options(cls, options): # type: (PortableOptions) -> EmbeddedPythonEnvironment return cls( capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options, logDependencies=False), + artifacts=python_sdk_dependencies(options), resource_hints=resource_hints_from_options(options), ) @@ -711,11 +711,11 @@ def from_options(cls, options): state_cache_size=config.get('state_cache_size'), data_buffer_time_limit_ms=config.get('data_buffer_time_limit_ms'), capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options, logDependencies=False)) + artifacts=python_sdk_dependencies(options)) else: return cls( capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options, logDependencies=False), + artifacts=python_sdk_dependencies(options), resource_hints=resource_hints_from_options(options)) @staticmethod @@ -896,7 +896,7 @@ def _python_sdk_capabilities_iter(): yield common_urns.protocols.DATA_SAMPLING.urn -def python_sdk_dependencies(options, tmp_dir=None, logDependencies=True): +def python_sdk_dependencies(options, tmp_dir=None): if tmp_dir is None: tmp_dir = tempfile.mkdtemp() skip_prestaged_dependencies = options.view_as( @@ -908,5 +908,4 @@ def python_sdk_dependencies(options, tmp_dir=None, logDependencies=True): artifact[0] + artifact[1] for artifact in PyPIArtifactRegistry.get_artifacts() ], - skip_prestaged_dependencies=skip_prestaged_dependencies, - log_submission_env_dependencies=logDependencies) + skip_prestaged_dependencies=skip_prestaged_dependencies) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 9cb6a51b8144b..61ebc8cc5b0c1 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -409,26 +409,16 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil { return fmt.Errorf("failed to install workflow: %v", err) } - if err := logRuntimeDependencies(ctx, logger); err != nil { + if err := logRuntimeDependencies(ctx, bufLogger); err != nil { logger.Warnf(ctx, "couldn't fetch the runtime python dependencies: %v", err) } + if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil { + logger.Warnf(ctx, "couldn't fetch the submission environment dependencies: %v", err) + } return nil } -func logSubmissionEnvDependencies(ctx context.Context, logger *tools.Logger, dir string) error { - logger.Printf(ctx, "Logging submission environment dependencies:") -// filename := filepath.Join(dir, "submission_environment_dependencies.txt") -// content, err := os.ReadFile(filename) -// if err != nil { -// return err -// } -// logger.Printf(ctx, string(content)) - dependencies = os.Getenv("SUBMISSION_DEPENDENCIES") - logger.Printf(ctx, dependencies) - return nil -} - // processArtifactsInSetupOnlyMode installs the dependencies found in artifacts // when flag --setup_only and --artifacts exist. The setup mode will only // process the provided artifacts and skip the actual worker program start up. @@ -472,20 +462,19 @@ func processArtifactsInSetupOnlyMode() { // logRuntimeDependencies logs the python dependencies // installed in the runtime environment. -func logRuntimeDependencies(ctx context.Context, logger *tools.Logger) error { - logger.Printf(ctx, "Logging runtime dependencies:") - pythonVersion, err := expansionx.GetPythonVersion() - if err != nil { - return err - } - logger.Printf(ctx, "Using Python version:") +func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger) error { + bufLogger.Printf(ctx, "Using Python version:") args := []string{"--version"} - bufLogger := tools.NewBufferedLogger(logger) if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) } else { bufLogger.FlushAtDebug(ctx) } + bufLogger.Printf(ctx, "Logging runtime dependencies:") + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + return err + } args = []string{"-m", "pip", "freeze"} if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) @@ -494,3 +483,18 @@ func logRuntimeDependencies(ctx context.Context, logger *tools.Logger) error { } return nil } + +// logSubmissionEnvDependencies logs the python dependencies +// installed in the submission environment. +func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, dir string) error { + bufLogger.Printf(ctx, "Logging submission environment dependencies:") + // path for submission environment dependencies should match with the + // one defined in apache_beam/runners/portability/stager.py. + filename := filepath.Join(dir, "submission_environment_dependencies.txt") + content, err := os.ReadFile(filename) + if err != nil { + return err + } + bufLogger.Printf(ctx, string(content)) + return nil +} From 789c764894dd2b92145b5bbf28b7ca39fc727f0c Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 20 Dec 2023 10:49:49 -0500 Subject: [PATCH 12/20] correct urn --- sdks/python/apache_beam/runners/portability/stager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 2c61ac931c171..5fe049a79abd4 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -121,7 +121,7 @@ def commit_manifest(self): @staticmethod def _create_file_stage_to_artifact(local_path, staged_name): return beam_runner_api_pb2.ArtifactInformation( - type_urn=common_urns.artifact_types.urn, + type_urn=common_urns.artifact_types.FILE.urn, type_payload=beam_runner_api_pb2.ArtifactFilePayload( path=local_path).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, From df68c1d6ccd79c95c9ec77e39f5739ca5885c7b3 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 20 Dec 2023 13:10:16 -0500 Subject: [PATCH 13/20] fix artifact comparison, file reader --- .../runners/portability/expansion_service.py | 10 ++++++++- .../portability/portable_runner_test.py | 21 ++++++++++++++----- .../apache_beam/runners/portability/stager.py | 9 ++++++-- .../apache_beam/transforms/environments.py | 12 +++++++++-- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index 50d727830de01..eac3bc3340d45 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -20,6 +20,7 @@ # pytype: skip-file import copy +import io import traceback from apache_beam import pipeline as beam_pipeline @@ -131,4 +132,11 @@ def with_pipeline(component, pcoll_id=None): error=traceback.format_exc()) def artifact_service(self): - return artifact_service.ArtifactRetrievalService(None) + return artifact_service.ArtifactRetrievalService(file_reader) + + +def file_reader(filepath: str) -> io.BytesIO: + """Reads a file at given path and returns io.BytesIO object""" + with open(filepath, 'rb') as f: + data = f.read() + return io.BytesIO(data) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 0b3704f57973a..6c43b68b158fc 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -312,8 +312,11 @@ def test__create_default_environment(self): docker_image = environments.DockerEnvironment.default_docker_image() self.assertEqual( PortableRunner._create_environment( - PipelineOptions.from_dictionary({'sdk_location': 'container'})), - environments.DockerEnvironment(container_image=docker_image)) + options=PipelineOptions.from_dictionary( + {'sdk_location': 'container'})), + environments.DockerEnvironment( + container_image=docker_image, + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) def test__create_docker_environment(self): docker_image = 'py-docker' @@ -324,7 +327,9 @@ def test__create_docker_environment(self): 'environment_config': docker_image, 'sdk_location': 'container', })), - environments.DockerEnvironment(container_image=docker_image)) + environments.DockerEnvironment( + container_image=docker_image, + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) def test__create_process_environment(self): self.assertEqual( @@ -337,7 +342,11 @@ def test__create_process_environment(self): 'sdk_location': 'container', })), environments.ProcessEnvironment( - 'run.sh', os='linux', arch='amd64', env={'k1': 'v1'})) + 'run.sh', + os='linux', + arch='amd64', + env={'k1': 'v1'}, + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) self.assertEqual( PortableRunner._create_environment( PipelineOptions.from_dictionary({ @@ -355,7 +364,9 @@ def test__create_external_environment(self): 'environment_config': 'localhost:50000', 'sdk_location': 'container', })), - environments.ExternalEnvironment('localhost:50000')) + environments.ExternalEnvironment( + 'localhost:50000', + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} ' for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()): self.assertEqual( diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 5fe049a79abd4..751e31727cbd6 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -165,6 +165,7 @@ def create_job_resources(options, # type: PipelineOptions pypi_requirements=None, # type: Optional[List[str]] populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]] skip_prestaged_dependencies=False, # type: Optional[bool] + log_submission_env_dependencies=True, # type: Optional[bool] ): """For internal use only; no backwards-compatibility guarantees. @@ -186,6 +187,8 @@ def create_job_resources(options, # type: PipelineOptions cache. Used only for testing. skip_prestaged_dependencies: Skip staging dependencies that can be added into SDK containers during prebuilding. + log_submission_env_dependencies: (Optional) param to stage and log + submission environment dependencies. Defaults to True. Returns: A list of ArtifactInformation to be used for staging resources. @@ -368,8 +371,10 @@ def create_job_resources(options, # type: PipelineOptions Stager._create_file_stage_to_artifact( pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) - # stage the submission environment dependencies - resources.extend(Stager._create_stage_submission_env_dependencies(temp_dir)) + # stage the submission environment dependencies, if enabled. + if log_submission_env_dependencies: + resources.extend( + Stager._create_stage_submission_env_dependencies(temp_dir)) return resources diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index b2fbe87a729ab..51e717ef45adf 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -125,9 +125,17 @@ def __init__(self, dict(resource_hints) if resource_hints else {}) def __eq__(self, other): + equal_artifacts = True + for first, second in zip(self._artifacts, other._artifacts): + # do not compare type payload since it contains a unique hash. + if (first.type_urn != second.type_urn or + first.role_urn != second.role_urn or + first.role_payload != second.role_payload): + equal_artifacts = False + break + return ( - self.__class__ == other.__class__ and - self._artifacts == other._artifacts + self.__class__ == other.__class__ and equal_artifacts # Assuming that we don't have instances of the same Environment subclass # with different set of capabilities. and self._resource_hints == other._resource_hints) From 4dfe7e008e189f7cda6f5f6b2d644a94b6a77a13 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 5 Feb 2024 12:06:49 -0500 Subject: [PATCH 14/20] add mock for python sdk dependencies and update artifact service method --- .../runners/portability/expansion_service.py | 13 +++------ .../portability/portable_runner_test.py | 17 +++++++++-- .../apache_beam/runners/portability/stager.py | 2 +- .../apache_beam/transforms/environments.py | 12 ++------ .../transforms/environments_test.py | 28 ++++++++++++++++++- 5 files changed, 49 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index eac3bc3340d45..4e328b68dfc29 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -20,7 +20,6 @@ # pytype: skip-file import copy -import io import traceback from apache_beam import pipeline as beam_pipeline @@ -31,6 +30,7 @@ from apache_beam.portability.api import beam_expansion_api_pb2_grpc from apache_beam.runners import pipeline_context from apache_beam.runners.portability import artifact_service +from apache_beam.runners.portability.artifact_service import BeamFilesystemHandler from apache_beam.transforms import environments from apache_beam.transforms import external from apache_beam.transforms import ptransform @@ -132,11 +132,6 @@ def with_pipeline(component, pcoll_id=None): error=traceback.format_exc()) def artifact_service(self): - return artifact_service.ArtifactRetrievalService(file_reader) - - -def file_reader(filepath: str) -> io.BytesIO: - """Reads a file at given path and returns io.BytesIO object""" - with open(filepath, 'rb') as f: - data = f.read() - return io.BytesIO(data) + """Returns a service to retrieve artifacts for use in a job.""" + return artifact_service.ArtifactRetrievalService( + BeamFilesystemHandler(None).file_reader) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 6c43b68b158fc..3615fce695e71 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -44,6 +44,7 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms import environments from apache_beam.transforms import userstate +from apache_beam.transforms.environments_test import mock_python_sdk_dependencies _LOGGER = logging.getLogger(__name__) @@ -308,6 +309,13 @@ def create_options(self): class PortableRunnerInternalTest(unittest.TestCase): + def setUp(self) -> None: + self.actual_python_sdk_dependencies = environments.python_sdk_dependencies + environments.python_sdk_dependencies = mock_python_sdk_dependencies + + def tearDown(self) -> None: + environments.python_sdk_dependencies = self.actual_python_sdk_dependencies + def test__create_default_environment(self): docker_image = environments.DockerEnvironment.default_docker_image() self.assertEqual( @@ -354,7 +362,9 @@ def test__create_process_environment(self): 'environment_config': '{"command": "run.sh"}', 'sdk_location': 'container', })), - environments.ProcessEnvironment('run.sh')) + environments.ProcessEnvironment( + 'run.sh', + artifacts=environments.python_sdk_dependencies(PipelineOptions()))) def test__create_external_environment(self): self.assertEqual( @@ -377,7 +387,10 @@ def test__create_external_environment(self): 'sdk_location': 'container', })), environments.ExternalEnvironment( - 'localhost:50000', params={"k1": "v1"})) + 'localhost:50000', + params={"k1": "v1"}, + artifacts=environments.python_sdk_dependencies( + PipelineOptions()))) with self.assertRaises(ValueError): PortableRunner._create_environment( PipelineOptions.from_dictionary({ diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 751e31727cbd6..44eb6880d7cac 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -867,7 +867,7 @@ def _create_stage_submission_env_dependencies(temp_dir): """Create and stage a file with list of dependencies installed in the submission environment. - This staged file is used at runtime to compare the dependencies in the + This list can be used at runtime to compare against the dependencies in the runtime environment. This allows runners to warn users about any potential dependency mismatches and help debug issues related to environment mismatches. diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 51e717ef45adf..b2fbe87a729ab 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -125,17 +125,9 @@ def __init__(self, dict(resource_hints) if resource_hints else {}) def __eq__(self, other): - equal_artifacts = True - for first, second in zip(self._artifacts, other._artifacts): - # do not compare type payload since it contains a unique hash. - if (first.type_urn != second.type_urn or - first.role_urn != second.role_urn or - first.role_payload != second.role_payload): - equal_artifacts = False - break - return ( - self.__class__ == other.__class__ and equal_artifacts + self.__class__ == other.__class__ and + self._artifacts == other._artifacts # Assuming that we don't have instances of the same Environment subclass # with different set of capabilities. and self._resource_hints == other._resource_hints) diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index 25885553b2ff5..c3aacb512a26f 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -21,13 +21,16 @@ # pytype: skip-file import logging +import tempfile import unittest from apache_beam.options.pipeline_options import PortableOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.portability import common_urns from apache_beam.runners import pipeline_context +from apache_beam.runners.portability import stager from apache_beam.transforms import environments -from apache_beam.transforms.environments import DockerEnvironment +from apache_beam.transforms.environments import DockerEnvironment, PyPIArtifactRegistry from apache_beam.transforms.environments import EmbeddedPythonEnvironment from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment from apache_beam.transforms.environments import Environment @@ -35,6 +38,22 @@ from apache_beam.transforms.environments import ProcessEnvironment from apache_beam.transforms.environments import SubprocessSDKEnvironment +# create a temp directory so that all artifacts are put in the same directory. +tmp_dir = tempfile.mkdtemp() + + +def mock_python_sdk_dependencies(options, tmp_dir=tmp_dir): + skip_prestaged_dependencies = options.view_as( + SetupOptions).prebuild_sdk_container_engine is not None + return stager.Stager.create_job_resources( + options, + tmp_dir, + pypi_requirements=[ + artifact[0] + artifact[1] + for artifact in PyPIArtifactRegistry.get_artifacts() + ], + skip_prestaged_dependencies=skip_prestaged_dependencies) + class RunnerApiTest(unittest.TestCase): def test_environment_encoding(self): @@ -82,6 +101,13 @@ def test_default_capabilities(self): class EnvironmentOptionsTest(unittest.TestCase): + def setUp(self) -> None: + self.actual_python_sdk_dependencies = environments.python_sdk_dependencies + environments.python_sdk_dependencies = mock_python_sdk_dependencies + + def tearDown(self) -> None: + environments.python_sdk_dependencies = self.actual_python_sdk_dependencies + def test_process_variables_empty(self): options = PortableOptions([ '--environment_type=PROCESS', From b7d62bc69585ee87f5a8c18092e40639359e5020 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 5 Feb 2024 12:49:23 -0500 Subject: [PATCH 15/20] fix lint --- sdks/python/apache_beam/transforms/environments_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index c3aacb512a26f..364fc4a8dd2d4 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -30,12 +30,13 @@ from apache_beam.runners import pipeline_context from apache_beam.runners.portability import stager from apache_beam.transforms import environments -from apache_beam.transforms.environments import DockerEnvironment, PyPIArtifactRegistry +from apache_beam.transforms.environments import DockerEnvironment from apache_beam.transforms.environments import EmbeddedPythonEnvironment from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment from apache_beam.transforms.environments import Environment from apache_beam.transforms.environments import ExternalEnvironment from apache_beam.transforms.environments import ProcessEnvironment +from apache_beam.transforms.environments import PyPIArtifactRegistry from apache_beam.transforms.environments import SubprocessSDKEnvironment # create a temp directory so that all artifacts are put in the same directory. From 404286ba01ea79e7d1e2fdf9a8b856482255b632 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 9 Feb 2024 09:40:37 -0500 Subject: [PATCH 16/20] use magic mock instead of mocking entire function --- .../portability/portable_runner_test.py | 11 +++++--- .../transforms/environments_test.py | 28 ++++--------------- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 3615fce695e71..85d1607e9fa19 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -21,8 +21,10 @@ import socket import subprocess import sys +import tempfile import time import unittest +from unittest.mock import MagicMock import grpc @@ -44,7 +46,6 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms import environments from apache_beam.transforms import userstate -from apache_beam.transforms.environments_test import mock_python_sdk_dependencies _LOGGER = logging.getLogger(__name__) @@ -310,11 +311,13 @@ def create_options(self): class PortableRunnerInternalTest(unittest.TestCase): def setUp(self) -> None: - self.actual_python_sdk_dependencies = environments.python_sdk_dependencies - environments.python_sdk_dependencies = mock_python_sdk_dependencies + self.tmp_dir = tempfile.TemporaryDirectory() + self.actual_mkdtemp = tempfile.mkdtemp + tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name) def tearDown(self) -> None: - environments.python_sdk_dependencies = self.actual_python_sdk_dependencies + tempfile.mkdtemp = self.actual_mkdtemp + self.tmp_dir.cleanup() def test__create_default_environment(self): docker_image = environments.DockerEnvironment.default_docker_image() diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index 364fc4a8dd2d4..c32a85579fcb1 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -23,12 +23,11 @@ import logging import tempfile import unittest +from unittest.mock import MagicMock from apache_beam.options.pipeline_options import PortableOptions -from apache_beam.options.pipeline_options import SetupOptions from apache_beam.portability import common_urns from apache_beam.runners import pipeline_context -from apache_beam.runners.portability import stager from apache_beam.transforms import environments from apache_beam.transforms.environments import DockerEnvironment from apache_beam.transforms.environments import EmbeddedPythonEnvironment @@ -36,25 +35,8 @@ from apache_beam.transforms.environments import Environment from apache_beam.transforms.environments import ExternalEnvironment from apache_beam.transforms.environments import ProcessEnvironment -from apache_beam.transforms.environments import PyPIArtifactRegistry from apache_beam.transforms.environments import SubprocessSDKEnvironment -# create a temp directory so that all artifacts are put in the same directory. -tmp_dir = tempfile.mkdtemp() - - -def mock_python_sdk_dependencies(options, tmp_dir=tmp_dir): - skip_prestaged_dependencies = options.view_as( - SetupOptions).prebuild_sdk_container_engine is not None - return stager.Stager.create_job_resources( - options, - tmp_dir, - pypi_requirements=[ - artifact[0] + artifact[1] - for artifact in PyPIArtifactRegistry.get_artifacts() - ], - skip_prestaged_dependencies=skip_prestaged_dependencies) - class RunnerApiTest(unittest.TestCase): def test_environment_encoding(self): @@ -103,11 +85,13 @@ def test_default_capabilities(self): class EnvironmentOptionsTest(unittest.TestCase): def setUp(self) -> None: - self.actual_python_sdk_dependencies = environments.python_sdk_dependencies - environments.python_sdk_dependencies = mock_python_sdk_dependencies + self.tmp_dir = tempfile.TemporaryDirectory() + self.actual_mkdtemp = tempfile.mkdtemp + tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name) def tearDown(self) -> None: - environments.python_sdk_dependencies = self.actual_python_sdk_dependencies + tempfile.mkdtemp = self.actual_mkdtemp + self.tmp_dir.cleanup() def test_process_variables_empty(self): options = PortableOptions([ From da2b9541e3e0c3763953ed1ae03d5661a870b06c Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 15 Feb 2024 15:31:18 -0500 Subject: [PATCH 17/20] update dataflow runner test --- .../runners/dataflow/dataflow_runner_test.py | 66 ++++++++++++++----- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index d765c58d2e1a3..8b90bb024d697 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -16,10 +16,13 @@ # """Unit tests for the DataflowRunner class.""" - # pytype: skip-file +import os + +import tempfile import unittest +from unittest.mock import MagicMock import mock @@ -96,6 +99,13 @@ def setUp(self): '--dry_run=True', '--sdk_location=container' ] + self.tmp_dir = tempfile.TemporaryDirectory() + self.actual_mkdtemp = tempfile.mkdtemp + tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name) + + def tearDown(self) -> None: + tempfile.mkdtemp = self.actual_mkdtemp + self.tmp_dir.cleanup() @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep): @@ -205,19 +215,30 @@ def test_environment_override_translation_legacy_worker_harness_image(self): self.default_properties.append('--experiments=beam_fn_api') self.default_properties.append('--worker_harness_container_image=LEGACY') remote_runner = DataflowRunner() - with Pipeline(remote_runner, - options=PipelineOptions(self.default_properties)) as p: + options = PipelineOptions(self.default_properties) + with Pipeline(remote_runner, options=options) as p: ( # pylint: disable=expression-not-assigned p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - first = remote_runner.proto_pipeline.components.environments.values() - second = beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=beam_runner_api_pb2.DockerPayload( - container_image='LEGACY').SerializeToString(), - capabilities=environments.python_sdk_docker_capabilities()) - self.assertTrue(first.__eq__(second)) + + # check if the temp path got cleaned after the above + # pipeline completion. If it does not exist then create the same + # temp directory again to compare artifact information. + if not os.path.exists(self.tmp_dir.name): + os.makedirs(self.tmp_dir.name) + + self.assertEqual( + list(remote_runner.proto_pipeline.components.environments.values()), + [ + beam_runner_api_pb2.Environment( + urn=common_urns.environments.DOCKER.urn, + payload=beam_runner_api_pb2.DockerPayload( + container_image='LEGACY').SerializeToString(), + capabilities=environments.python_sdk_docker_capabilities(), + dependencies=environments.python_sdk_dependencies( + PipelineOptions())) + ]) def test_environment_override_translation_sdk_container_image(self): self.default_properties.append('--experiments=beam_fn_api') @@ -229,13 +250,24 @@ def test_environment_override_translation_sdk_container_image(self): p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - first = remote_runner.proto_pipeline.components.environments.values() - second = beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=beam_runner_api_pb2.DockerPayload( - container_image='FOO').SerializeToString(), - capabilities=environments.python_sdk_docker_capabilities()) - self.assertTrue(first.__eq__(second)) + + # check if the temp path got cleaned after the above + # pipeline completion. If it does not exist then create the same + # temp directory again to compare artifact information. + if not os.path.exists(self.tmp_dir.name): + os.makedirs(self.tmp_dir.name) + + self.assertEqual( + list(remote_runner.proto_pipeline.components.environments.values()), + [ + beam_runner_api_pb2.Environment( + urn=common_urns.environments.DOCKER.urn, + payload=beam_runner_api_pb2.DockerPayload( + container_image='FOO').SerializeToString(), + capabilities=environments.python_sdk_docker_capabilities(), + dependencies=environments.python_sdk_dependencies( + PipelineOptions())) + ]) def test_remote_runner_translation(self): remote_runner = DataflowRunner() From eeedc444079cc436d7607a691f3be022c43f54c3 Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Thu, 15 Feb 2024 16:11:23 -0500 Subject: [PATCH 18/20] Update sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py --- .../python/apache_beam/runners/dataflow/dataflow_runner_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 8b90bb024d697..28a8bd45c5d4c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -18,9 +18,7 @@ """Unit tests for the DataflowRunner class.""" # pytype: skip-file import os - import tempfile - import unittest from unittest.mock import MagicMock From c527114e980d7978c1d1f6232e9433d9c4ff486a Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 7 Mar 2024 13:16:31 -0500 Subject: [PATCH 19/20] use debug option to disable --- .../runners/dataflow/dataflow_runner_test.py | 25 ++++++------------- .../apache_beam/runners/portability/stager.py | 4 ++- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 8b90bb024d697..e449bb26784e1 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -17,7 +17,6 @@ """Unit tests for the DataflowRunner class.""" # pytype: skip-file -import os import tempfile @@ -216,18 +215,14 @@ def test_environment_override_translation_legacy_worker_harness_image(self): self.default_properties.append('--worker_harness_container_image=LEGACY') remote_runner = DataflowRunner() options = PipelineOptions(self.default_properties) + options.view_as(DebugOptions).add_experiment( + 'disable_logging_submission_environment') with Pipeline(remote_runner, options=options) as p: ( # pylint: disable=expression-not-assigned p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - # check if the temp path got cleaned after the above - # pipeline completion. If it does not exist then create the same - # temp directory again to compare artifact information. - if not os.path.exists(self.tmp_dir.name): - os.makedirs(self.tmp_dir.name) - self.assertEqual( list(remote_runner.proto_pipeline.components.environments.values()), [ @@ -237,26 +232,22 @@ def test_environment_override_translation_legacy_worker_harness_image(self): container_image='LEGACY').SerializeToString(), capabilities=environments.python_sdk_docker_capabilities(), dependencies=environments.python_sdk_dependencies( - PipelineOptions())) + options=options)) ]) def test_environment_override_translation_sdk_container_image(self): self.default_properties.append('--experiments=beam_fn_api') self.default_properties.append('--sdk_container_image=FOO') remote_runner = DataflowRunner() - with Pipeline(remote_runner, - options=PipelineOptions(self.default_properties)) as p: + options = PipelineOptions(self.default_properties) + options.view_as(DebugOptions).add_experiment( + 'disable_logging_submission_environment') + with Pipeline(remote_runner, options=options) as p: ( # pylint: disable=expression-not-assigned p | ptransform.Create([1, 2, 3]) | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - # check if the temp path got cleaned after the above - # pipeline completion. If it does not exist then create the same - # temp directory again to compare artifact information. - if not os.path.exists(self.tmp_dir.name): - os.makedirs(self.tmp_dir.name) - self.assertEqual( list(remote_runner.proto_pipeline.components.environments.values()), [ @@ -266,7 +257,7 @@ def test_environment_override_translation_sdk_container_image(self): container_image='FOO').SerializeToString(), capabilities=environments.python_sdk_docker_capabilities(), dependencies=environments.python_sdk_dependencies( - PipelineOptions())) + options=options)) ]) def test_remote_runner_translation(self): diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 44eb6880d7cac..f6207d80a9d1e 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -372,7 +372,9 @@ def create_job_resources(options, # type: PipelineOptions pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) # stage the submission environment dependencies, if enabled. - if log_submission_env_dependencies: + if (log_submission_env_dependencies and + not options.view_as(DebugOptions).lookup_experiment( + 'disable_logging_submission_environment')): resources.extend( Stager._create_stage_submission_env_dependencies(temp_dir)) From 80d6d18bc4b1c7be4b1206d7369e6385947c616a Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 7 Mar 2024 13:21:08 -0500 Subject: [PATCH 20/20] remove tmp directory mock --- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 49736fdb355fd..bef184c45c4fd 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -18,9 +18,7 @@ """Unit tests for the DataflowRunner class.""" # pytype: skip-file -import tempfile import unittest -from unittest.mock import MagicMock import mock @@ -97,13 +95,6 @@ def setUp(self): '--dry_run=True', '--sdk_location=container' ] - self.tmp_dir = tempfile.TemporaryDirectory() - self.actual_mkdtemp = tempfile.mkdtemp - tempfile.mkdtemp = MagicMock(return_value=self.tmp_dir.name) - - def tearDown(self) -> None: - tempfile.mkdtemp = self.actual_mkdtemp - self.tmp_dir.cleanup() @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep):