Skip to content

How to have multiple consumers? #33

Open
@jrmeland

Description

@jrmeland

Thank you for your work!

I recently implemented the DynamoDB backend, but when I go to run multiple instances (via Kubernetes) I find that one of the instances errors out, while another picks up. The end result is that only one instance is processing at a time. Is there something I am missing?

Here is the error:

Traceback (most recent call last):
--
File "/usr/local/lib/python3.9/site-packages/kinesis/consumer.py", line 205, in __iter__
self.state.checkpoint(state_shard_id, item['SequenceNumber'])
File "/usr/local/lib/python3.9/site-packages/kinesis/state.py", line 41, in checkpoint
self.dynamo_table.update_item(
File "/usr/local/lib/python3.9/site-packages/boto3/resources/factory.py", line 580, in do_action
response = action(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/boto3/resources/action.py", line 88, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ConditionalCheckFailedException: An error occurred (ConditionalCheckFailedException) when calling the UpdateItem operation: The conditional request failed

Based on the update expression:

            self.dynamo_table.update_item(
                Key={'shard': shard_id},
                UpdateExpression="set seq = :seq",
                ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)",
                ExpressionAttributeValues={
                    ':fqdn': fqdn,
                    ':seq': seq,
                }
            )

It seems likely that the condition check that is failing is the fqdn check. Which would seem reasonable if another consumer came in and claimed the shard. But seems like that is a reasonable thing to happen, so I am not sure why there wouldn't be a graceful relinquish of a shard.

Any help is appreciated! Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions