Skip to content

Commit

Permalink
Require --sdk_location for Dataflow pipelines running with dev SDKs. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tvalentyn authored Sep 27, 2023
1 parent f35a413 commit ced3de3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,16 @@ def _check_and_add_missing_options(options):
elif debug_options.lookup_experiment('enable_prime'):
dataflow_service_options.append('enable_prime')

sdk_location = options.view_as(SetupOptions).sdk_location
if 'dev' in beam.version.__version__ and sdk_location == 'default':
raise ValueError(
"When launching Dataflow Jobs with an unreleased SDK, "
"please provide an SDK distribution in the --sdk_location option "
"to use consistent SDK version at "
"pipeline submission and runtime. To ignore this error and use the "
"SDK installed in Dataflow dev containers, use "
"--sdk_location=container.")

# Streaming only supports using runner v2 (aka unified worker).
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
Expand Down
17 changes: 12 additions & 5 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def expand(self, pcoll):
self.assertIn(packed_step_name, transform_names)

def test_batch_is_runner_v2(self):
options = PipelineOptions()
options = PipelineOptions(['--sdk_location=container'])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand All @@ -512,7 +512,7 @@ def test_batch_is_runner_v2(self):
expected)

def test_streaming_is_runner_v2(self):
options = PipelineOptions(['--streaming'])
options = PipelineOptions(['--sdk_location=container', '--streaming'])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand All @@ -525,7 +525,11 @@ def test_streaming_is_runner_v2(self):
expected)

def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options = PipelineOptions(['--dataflow_service_options=enable_prime'])
options = PipelineOptions([
'--sdk_location=container',
'--streaming',
'--dataflow_service_options=enable_prime'
])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand All @@ -535,8 +539,11 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options.view_as(DebugOptions).lookup_experiment(expected, False),
expected)

options = PipelineOptions(
['--streaming', '--dataflow_service_options=enable_prime'])
options = PipelineOptions([
'--sdk_location=container',
'--streaming',
'--dataflow_service_options=enable_prime'
])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand Down
8 changes: 3 additions & 5 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,9 @@ def _create_extra_packages(extra_packages, temp_dir):
'".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package)
if os.path.basename(package).endswith('.whl'):
_LOGGER.warning(
'The .whl package "%s" is provided in --extra_package. '
'This functionality is not officially supported. Since wheel '
'packages are binary distributions, this package must be '
'binary-compatible with the worker environment (e.g. Python 2.7 '
'running on an x64 Linux host).' % package)
'The .whl package "%s" provided in --extra_package '
'must be binary-compatible with the worker runtime environment.' %
package)

if not os.path.isfile(package):
if Stager._is_remote_path(package):
Expand Down

0 comments on commit ced3de3

Please sign in to comment.