Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to have multiple consumers? #33

Open
jrmeland opened this issue Apr 15, 2023 · 0 comments
Open

How to have multiple consumers? #33

jrmeland opened this issue Apr 15, 2023 · 0 comments

Comments

@jrmeland
Copy link

Thank you for your work!

I recently implemented the DynamoDB backend, but when I go to run multiple instances (via Kubernetes) I find that one of the instances errors out, while another picks up. The end result is that only one instance is processing at a time. Is there something I am missing?

Here is the error:

Traceback (most recent call last):
--
File "/usr/local/lib/python3.9/site-packages/kinesis/consumer.py", line 205, in __iter__
self.state.checkpoint(state_shard_id, item['SequenceNumber'])
File "/usr/local/lib/python3.9/site-packages/kinesis/state.py", line 41, in checkpoint
self.dynamo_table.update_item(
File "/usr/local/lib/python3.9/site-packages/boto3/resources/factory.py", line 580, in do_action
response = action(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/boto3/resources/action.py", line 88, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ConditionalCheckFailedException: An error occurred (ConditionalCheckFailedException) when calling the UpdateItem operation: The conditional request failed

Based on the update expression:

            self.dynamo_table.update_item(
                Key={'shard': shard_id},
                UpdateExpression="set seq = :seq",
                ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)",
                ExpressionAttributeValues={
                    ':fqdn': fqdn,
                    ':seq': seq,
                }
            )

It seems likely that the condition check that is failing is the fqdn check. Which would seem reasonable if another consumer came in and claimed the shard. But seems like that is a reasonable thing to happen, so I am not sure why there wouldn't be a graceful relinquish of a shard.

Any help is appreciated! Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant