Skip to content

Commit

Permalink
Merge pull request apache#13135: [BEAM-10861] Updates Pub/Sub runner …
Browse files Browse the repository at this point in the history
…API transformation to preserve None values
  • Loading branch information
chamikaramj authored Oct 17, 2020
2 parents 488c10c + 295f2a6 commit 7c357e4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
55 changes: 55 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,31 @@ def test_runner_api_transformation_with_topic(self, unused_mock_pubsub):
transform_from_proto.source.full_topic)
self.assertTrue(transform_from_proto.source.with_attributes)

def test_runner_api_transformation_properties_none(self, unused_mock_pubsub):
# Confirming that properties stay None after a runner API transformation.
source = _PubSubSource(
topic='projects/fakeprj/topics/a_topic', with_attributes=True)
transform = Read(source)

context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_READ.urn, proto_transform_spec.urn)

pubsub_read_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubReadPayload))

proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)

transform_from_proto = Read.from_runner_api_parameter(
proto_transform, pubsub_read_payload, None)
self.assertIsNone(transform_from_proto.source.full_subscription)
self.assertIsNone(transform_from_proto.source.id_label)
self.assertIsNone(transform_from_proto.source.timestamp_attribute)

def test_runner_api_transformation_with_subscription(
self, unused_mock_pubsub):
source = _PubSubSource(
Expand Down Expand Up @@ -787,6 +812,36 @@ def test_runner_api_transformation(self, unused_mock_pubsub):
self.assertEqual(
'projects/fakeprj/topics/a_topic', transform_from_proto.sink.full_topic)

def test_runner_api_transformation_properties_none(self, unused_mock_pubsub):
# Confirming that properties stay None after a runner API transformation.
sink = _PubSubSink(
topic='projects/fakeprj/topics/a_topic',
id_label=None,
with_attributes=True,
# We expect encoded PubSub write transform to always return attributes.
timestamp_attribute=None)
transform = Write(sink)

context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_WRITE.urn, proto_transform_spec.urn)

pubsub_write_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubWritePayload))
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
transform_from_proto = Write.from_runner_api_parameter(
proto_transform, pubsub_write_payload, None)

self.assertTrue(isinstance(transform_from_proto, Write))
self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
self.assertTrue(transform_from_proto.sink.with_attributes)
self.assertIsNone(transform_from_proto.sink.id_label)
self.assertIsNone(transform_from_proto.sink.timestamp_attribute)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,11 @@ def from_runner_api_parameter(transform, payload, context):
# Importing locally to prevent circular dependencies.
from apache_beam.io.gcp.pubsub import _PubSubSource
source = _PubSubSource(
topic=payload.topic,
subscription=payload.subscription,
id_label=payload.id_attribute,
topic=payload.topic or None,
subscription=payload.subscription or None,
id_label=payload.id_attribute or None,
with_attributes=payload.with_attributes,
timestamp_attribute=payload.timestamp_attribute)
timestamp_attribute=payload.timestamp_attribute or None)
return Read(source)
else:
return Read(SourceBase.from_runner_api(payload.source, context))
Expand Down Expand Up @@ -1050,10 +1050,10 @@ def from_runner_api_parameter(ptransform, payload, unused_context):
# Importing locally to prevent circular dependencies.
from apache_beam.io.gcp.pubsub import _PubSubSink
sink = _PubSubSink(
topic=payload.topic,
id_label=payload.id_attribute,
topic=payload.topic or None,
id_label=payload.id_attribute or None,
with_attributes=payload.with_attributes,
timestamp_attribute=payload.timestamp_attribute)
timestamp_attribute=payload.timestamp_attribute or None)
return Write(sink)


Expand Down

0 comments on commit 7c357e4

Please sign in to comment.