Skip to content

Commit

Permalink
Merge pull request apache#12156 from tvalentyn/py38_support
Browse files Browse the repository at this point in the history
[BEAM-9754] Add Py 3.8 support to Dataflow runner.
  • Loading branch information
tvalentyn authored Jul 1, 2020
2 parents a5c32f3 + 190b69d commit d6fcc1e
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 139 deletions.
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,16 @@
import sys
import warnings

if sys.version_info[0] == 2 and sys.version_info[1] == 7:
if sys.version_info.major == 2 and sys.version_info.minor == 7:
warnings.warn(
'You are using Apache Beam with Python 2. '
'New releases of Apache Beam will soon support Python 3 only.')
elif sys.version_info[0] == 3:
elif sys.version_info.major == 3:
if sys.version_info.minor >= 8:
warnings.warn(
'This version of Apache Beam has not been sufficiently tested on '
'Python %s.%s. You may encounter bugs or missing features.' %
(sys.version_info.major, sys.version_info.minor))
pass
else:
raise RuntimeError(
Expand Down
63 changes: 0 additions & 63 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,6 @@ def test_environment_override_translation(self):
capabilities=environments.python_sdk_capabilities())
])

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_remote_runner_translation(self):
remote_runner = DataflowRunner()
with Pipeline(remote_runner,
Expand All @@ -238,9 +235,6 @@ def test_remote_runner_translation(self):
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
Expand All @@ -256,9 +250,6 @@ def test_streaming_create_translation(self):
self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo')

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_bigquery_read_streaming_fail(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
Expand All @@ -268,9 +259,6 @@ def test_bigquery_read_streaming_fail(self):
PipelineOptions(self.default_properties)) as p:
_ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_biqquery_read_fn_api_fail(self):
remote_runner = DataflowRunner()
for flag in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2']:
Expand All @@ -283,9 +271,6 @@ def test_biqquery_read_fn_api_fail(self):
PipelineOptions(self.default_properties)) as p:
_ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_remote_runner_display_data(self):
remote_runner = DataflowRunner()
p = Pipeline(
Expand Down Expand Up @@ -328,9 +313,6 @@ def test_remote_runner_display_data(self):
}]
self.assertUnhashableCountEqual(disp_data, expected_data)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_no_group_by_key_directly_after_bigquery(self):
remote_runner = DataflowRunner()
with self.assertRaises(ValueError,
Expand Down Expand Up @@ -456,9 +438,6 @@ def test_side_input_visitor(self):
common_urns.side_inputs.MULTIMAP.urn,
side_input._side_input_data().access_pattern)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_min_cpu_platform_flag_is_propagated_to_experiments(self):
remote_runner = DataflowRunner()
self.default_properties.append('--min_cpu_platform=Intel Haswell')
Expand All @@ -469,9 +448,6 @@ def test_min_cpu_platform_flag_is_propagated_to_experiments(self):
'min_cpu_platform=Intel Haswell',
remote_runner.job.options.view_as(DebugOptions).experiments)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_streaming_engine_flag_adds_windmill_experiments(self):
remote_runner = DataflowRunner()
self.default_properties.append('--streaming')
Expand All @@ -487,9 +463,6 @@ def test_streaming_engine_flag_adds_windmill_experiments(self):
self.assertIn('enable_windmill_service', experiments_for_job)
self.assertIn('some_other_experiment', experiments_for_job)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_upload_graph_experiment(self):
remote_runner = DataflowRunner()
self.default_properties.append('--experiment=upload_graph')
Expand All @@ -501,9 +474,6 @@ def test_upload_graph_experiment(self):
remote_runner.job.options.view_as(DebugOptions).experiments)
self.assertIn('upload_graph', experiments_for_job)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_dataflow_worker_jar_flag_non_fnapi_noop(self):
remote_runner = DataflowRunner()
self.default_properties.append('--experiment=some_other_experiment')
Expand All @@ -517,9 +487,6 @@ def test_dataflow_worker_jar_flag_non_fnapi_noop(self):
self.assertIn('some_other_experiment', experiments_for_job)
self.assertNotIn('use_staged_dataflow_worker_jar', experiments_for_job)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_dataflow_worker_jar_flag_adds_use_staged_worker_jar_experiment(self):
remote_runner = DataflowRunner()
self.default_properties.append('--experiment=beam_fn_api')
Expand All @@ -533,9 +500,6 @@ def test_dataflow_worker_jar_flag_adds_use_staged_worker_jar_experiment(self):
self.assertIn('beam_fn_api', experiments_for_job)
self.assertIn('use_staged_dataflow_worker_jar', experiments_for_job)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_use_fastavro_experiment_is_added_on_py3_and_onwards(self):
remote_runner = DataflowRunner()

Expand All @@ -547,9 +511,6 @@ def test_use_fastavro_experiment_is_added_on_py3_and_onwards(self):
remote_runner.job.options.view_as(DebugOptions).lookup_experiment(
'use_fastavro', False))

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_use_fastavro_experiment_is_not_added_when_use_avro_is_present(self):
remote_runner = DataflowRunner()
self.default_properties.append('--experiment=use_avro')
Expand All @@ -561,9 +522,6 @@ def test_use_fastavro_experiment_is_not_added_when_use_avro_is_present(self):

self.assertFalse(debug_options.lookup_experiment('use_fastavro', False))

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_unsupported_fnapi_features(self):
remote_runner = DataflowRunner()
self.default_properties.append('--experiment=beam_fn_api')
Expand Down Expand Up @@ -617,9 +575,6 @@ def test_get_default_gcp_region_ignores_error(
result = runner.get_default_gcp_region()
self.assertIsNone(result)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_combine_values_translation(self):
runner = DataflowRunner()

Expand Down Expand Up @@ -670,9 +625,6 @@ def expect_correct_override(self, job, step_name, step_kind):
self.assertGreater(len(step[u'properties']['display_data']), 0)
self.assertEqual(step[u'properties']['output_info'], expected_output_info)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_read_create_translation(self):
runner = DataflowRunner()

Expand All @@ -683,9 +635,6 @@ def test_read_create_translation(self):

self.expect_correct_override(runner.job, u'Create/Read', u'ParallelRead')

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_read_bigquery_translation(self):
runner = DataflowRunner()

Expand All @@ -696,9 +645,6 @@ def test_read_bigquery_translation(self):

self.expect_correct_override(runner.job, u'Read', u'ParallelRead')

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_read_pubsub_translation(self):
runner = DataflowRunner()

Expand All @@ -712,9 +658,6 @@ def test_read_pubsub_translation(self):
self.expect_correct_override(
runner.job, u'ReadFromPubSub/Read', u'ParallelRead')

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_gbk_translation(self):
runner = DataflowRunner()
with beam.Pipeline(runner=runner,
Expand Down Expand Up @@ -752,9 +695,6 @@ def test_gbk_translation(self):
self.assertEqual(
gbk_step[u'properties']['output_info'], expected_output_info)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_write_bigquery_translation(self):
runner = DataflowRunner()

Expand Down Expand Up @@ -805,9 +745,6 @@ def test_write_bigquery_translation(self):
del step_encoding[u'component_encodings'][0][u'@type']
self.assertEqual(expected_step, write_step)

@unittest.skipIf(
sys.version_info.minor == 8,
'Doesn\'t work on Python 3.8, see: BEAM-9754')
def test_write_bigquery_failed_translation(self):
"""Tests that WriteToBigQuery cannot have any consumers if replaced."""
runner = DataflowRunner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,8 @@ def get_container_image_from_options(pipeline_options):
version_suffix = '36'
elif sys.version_info[0:2] == (3, 7):
version_suffix = '37'
elif sys.version_info[0:2] == (3, 8):
version_suffix = '38'
else:
raise Exception(
'Dataflow only supports Python versions 2 and 3.5+, got: %s' %
Expand Down Expand Up @@ -1144,7 +1146,7 @@ def get_response_encoding():


def _verify_interpreter_version_is_supported(pipeline_options):
if sys.version_info[0:2] in [(2, 7), (3, 5), (3, 6), (3, 7)]:
if sys.version_info[0:2] in [(2, 7), (3, 5), (3, 6), (3, 7), (3, 8)]:
return

debug_options = pipeline_options.view_as(DebugOptions)
Expand All @@ -1154,7 +1156,7 @@ def _verify_interpreter_version_is_supported(pipeline_options):

raise Exception(
'Dataflow runner currently supports Python versions '
'2.7, 3.5, 3.6, and 3.7. To ignore this requirement and start a job '
'2.7, 3.5, 3.6, 3.7 and 3.8. To ignore this requirement and start a job '
'using a different version of Python 3 interpreter, pass '
'--experiment ignore_py3_minor_version pipeline option.')

Expand Down
Loading

0 comments on commit d6fcc1e

Please sign in to comment.