Skip to content

Commit

Permalink
add cloud_pubsub_dead_letter_ack, check topic before send
Browse files Browse the repository at this point in the history
  • Loading branch information
roticagas committed Jan 11, 2019
1 parent 92a35af commit b9e9cdb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
19 changes: 10 additions & 9 deletions PubSubRunner/runner_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def verify_config(self):
logging.warning('publish topic are not set, check your config carefully')
if self.config.cloud_pubsub_dead_letter_topic == '':
logging.warning('dead letter topic are not set, check your config carefully')
assert not self.config.cloud_pubsub_ack, 'dead letter should not be sent if message not ack'
assert not self.config.cloud_pubsub_dead_letter_ack, 'dead letter should not be sent if message not ack'

def check_pubsub(self):
"""
Expand Down Expand Up @@ -72,14 +72,15 @@ def subscribe_processing(self, message):
except JSONDecodeError as e:
# NOTE: if payload not in json format: send to dead letter topic
logging.error(str(e))
CloudUtil.publish_dead_letter(self.config.cloud_project,
self.config.cloud_pubsub_dead_letter_topic,
self.config.cloud_pubsub_publish_topic,
message,
e,
'Invalid json format')
if self.config.cloud_pubsub_ack:
message.ack()
if self.config.cloud_pubsub_dead_letter_topic != '':
CloudUtil.publish_dead_letter(self.config.cloud_project,
self.config.cloud_pubsub_dead_letter_topic,
self.config.cloud_pubsub_publish_topic,
message,
e,
'Invalid json format')
if self.config.cloud_pubsub_dead_letter_ack:
message.ack()
except Exception as e:
logging.error(str(e))
message.nack()
Expand Down
2 changes: 2 additions & 0 deletions PubSubRunner/runner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RunnerConfig:
('CLOUD_PUBSUB_MAX_DEADLINE', '600'),
('CLOUD_PUBSUB_CHECK', 'true'),
('CLOUD_PUBSUB_ACK', 'true'),
('CLOUD_PUBSUB_DEAD_LETTER_ACK', 'false'),
]

def __init__(self):
Expand All @@ -27,3 +28,4 @@ def __init__(self):
self.cloud_pubsub_ack_deadline = int(env[7])
self.cloud_pubsub_check = env[8] in ['True', 'true', 'TRUE']
self.cloud_pubsub_ack = env[9] in ['True', 'true', 'TRUE']
self.cloud_pubsub_dead_letter_ack = env[10] in ['True', 'true', 'TRUE']

0 comments on commit b9e9cdb

Please sign in to comment.