Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confirm NextShardIterator key exist to fetch from existing consumer shard #26

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

jmcgrath207
Copy link
Contributor

Hey Hampsterx,

So I was added shards to my kinesis stream and from the discussion I had with AWS engineer is those old shards would still be active for at least during the data retention period of your stream. However in the results from the old shards, they do not contain the NextShardIterator key in the results, even though it's mention it will be null in their docs.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_records

This basically makes the consumer move to the next shard if the key NextShardIterator is not found or is string null in results after returning output.

@jmcgrath207 jmcgrath207 changed the title Confirm NextShardIterator key exist to fetch to consumer on same shard Confirm NextShardIterator key exist to fetch from existing consumer shard Jan 29, 2021
@hampsterx
Copy link
Owner

hi @jmcgrath207

Interesting, I assumed at the time something more complicated would be required if the shard count was increased hence the NotImplemented Exception. I wonder if its worth logging a warning or something because currently the consumer does'nt recheck the shards or otherwise have any way of knowing a reshard has occurred. Basically you would have to restard the consumer.

some random googling..

https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html

There are two types of resharding operations: shard split and shard merge. In a shard split, you divide a single shard into two shards. In a shard merge, you combine two shards into a single shard. Resharding is always pairwise in the sense that you cannot split into more than two shards in a single operation, and you cannot merge more than two shards in a single operation. The shard or pair of shards that the resharding operation acts on are referred to as parent shards. The shard or pair of shards that result from the resharding operation are referred to as child shards.

https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/

The shard or pair of shards that the resharding operation acts on are known as parent shards. The shards that are created after the resharding operation are known as child shards. A parent shard also transitions from an OPEN state to a CLOSED state (and eventually, to an EXPIRED state, after the stream’s retention period). This can result in child shards being assigned an OPEN state

After resharding, you should also continue to read data from the CLOSED shards until they are exhausted. This helps to preserve the order of the data read by the consumer applications. After you have exhausted all of the CLOSED shards, you can begin to read data from open child shards. The Amazon Kinesis client library (KCL) is designed to adapt to resharding operations. Any data that existed in the shards before resharding is processed first.

https://tutorialsdojo.com/kinesis-scaling-resharding-and-parallel-processing/

The Kinesis Client Library (KCL) tracks the shards in the stream using an Amazon DynamoDB table, and adapts to changes in the number of shards that result from resharding. When new shards are created as a result of resharding, the KCL discovers the new shards and populates new rows in the table. The workers automatically discover the new shards and create processors to handle the data from them. The KCL also distributes the shards in the stream across all the available workers and record processors.

Hmm, interesting.

https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java

  • Task to block until processing of all data records in the parent shard(s) is completed.
  • We check if we have checkpoint(s) for the parent shard(s).
  • If a checkpoint for a parent shard is found, we poll and wait until the checkpoint value is SHARD_END
  • (application has checkpointed after processing all records in the shard).
  • If we don't find a checkpoint for the parent shard(s), we assume they have been trimmed and directly
  • proceed with processing data from the shard.

Hmm, there is quite a bit of java code (haha) in that library so not clear (after 3m of looking) how it entirely works.

I'm in favour of a simple approach, ie abort if a reshard happens. This is fine as container services will just restart. So long as on restart it then does the right thing of draining the old shards first.

Open to ideas, in your PR, does skipping the shard do the right thing in this case? Guessing probably not? If it does then I am happy to merge as its far simpler than alternative!

So sounds like ideally it would need to (just rambling here..):

  • detect reshard event? Is this as simple as hit the NextShardIterator missing/null?
  • pause consuming on all shards
  • refetch list of shards (somehow know which ones are parent/child?)
  • start a flag/lock. consume only on "closing" (parent) shards or existing shards (not new)
  • remove flag/lock, start consuming on all shards

hmm, sounds complicated, would be cool as then can just dynamically reshard up and down and the consumer would be able to keep going like a champ hehe.

@hampsterx
Copy link
Owner

Bit more..

https://awsdocs.s3.amazonaws.com/kinesis/latest/kinesis-dg.pdf (2013)

After the reshard has occurred and the stream is once again in an ACTIVE state, you could immediately
begin to read data from the child shards. However, the parent shards that remain after the reshard could
still contain data that you haven't read yet that was added to the stream prior to the reshard. If you read
data from the child shards before having read all data from the parent shards, you could read data for a
particular hash key out of the order given by the data records' sequence numbers. Therefore, assuming
that the order of the data is important, you should, after a reshard, always continue to read data from the
parent shards until it is exhausted, and only then begin reading data from the child shards. When
getRecordsResult.getNextShardIterator() returns null, it indicates that you have read all the
data in the parent shard. If you are reading data using the Amazon Kinesis Client Library, the library
ensures that you receive the data in order even if a reshard occurs.

Ahh, so the stream changes state until the reshard is done. Right..

So basically to clarify the current behaviour.

Consumer will keep going, skipping the exausted shards. However it will then only be reading the remaining unaffected shards (ie not the new ones) until a restart. If the consumer failed/restarted before draining old shards then that data would be lost.

@hampsterx
Copy link
Owner

sorry one more comment..

https://github.com/ungikim/kinsumer/blob/master/kinsumer/consumer.py

That implementation uses timer/poll to check the shards every hour. see ShardMonitor

Thats the thing currently, if you add more shards you need to restart the consumer(s) :(

@akursar no wonder the listShards api is 100 ops/se per stream. This allows every consumer to frequently check the stream for shard changes!

@jmcgrath207
Copy link
Contributor Author

Hey @hampsterx,

So after spending sometime with this change, it does work by definition however, it's very brute force and I also failed to take into consideration the checkpointer portion, where it would basically blacklist the shard once exhausted.

So in my scenario, I had a stream with 3 shards, I bump it to 4, then 5, in a 24 hour period. This caused the consumer to return 22 shards in total with most being empty.

With this change it would go through all the shards(that are active) to see if it has data or not. Not great but it works.

Saying that I think that part is unavoidable during our first run, however with the check pointer being aware of it we can avoid that once more consumer are added assuming redis checkpointer is being used.

Here is what I am thinking I should change:

  • keeping the similar logic from this pull request, but push it up and have it deallocate the shard so the checkpointer is aware of it also.
  • implement a timer logic so it refresh the shard list based on the data retention period(assume we can get that from boto3). So usually 24 hours.

With this we would still get the remaining records from the parent, but once exhausted we tell it too not track again that shard again. This idea falls apart with the memory checkpointer though.

Let me know if I am missing something here.

example line 190 in the consumer.py

            if shard.get("fetch"):
               # timer( monotonic based) logic for refreshing shard list based on data retention time that is returned
                if shard["fetch"].done():
                    result = shard["fetch"].result()

                    if not result:
                        shard["fetch"] = None
                        continue

                    records = result["Records"]

                    if records:
                        log.debug(
                            "Shard {} got {} records".format(
                                shard["ShardId"], len(records)
                            )
                        )

                        total_items = 0
                        for row in result["Records"]:
                            for n, output in enumerate(
                                self.processor.parse(row["Data"])
                            ):
                                await self.queue.put(output)
                            total_items += n + 1

                        if not result.get("NextShardIterator") or result.get("NextShardIterator") == 'null':
                            shard["fetch"] = None
                            self.checkpointer.deallocate(shard["ShardId"])
                            continue

@hampsterx
Copy link
Owner

yep I forgot about the checkpointer part as well. The deallocate (for redis) should probably set an expiring key to the duration of the stream retention so if restarted it would know that is an expired shard?

The memory checkpointer I would not be too worried about, I don't really think it makes too much sense not using checkpointing so was mostly there for unit tests, probably it should just error/abort.

Timer logic, was thinking it should be more like every 5m, ie duration in seconds

Still need some logic to say don't start consuming those new shards until the old ones are drained aye

When you get this working i'd like to try and add a unit test that changes the shard count to confirm the behaviour. Seems like its supported in kinesalite but whether it maps correctly to AWS behaviour or not who knows.

@hampsterx hampsterx mentioned this pull request Feb 17, 2021
@chekan-o
Copy link
Contributor

@hampsterx any plans to merge it any time soon? with the release of ON_DEMAND mode for kinesis, this PR is a must have

@hampsterx
Copy link
Owner

hi @chekan-o this PR needs some more work thats for sure and this was all @jmcgrath207 effort here. This feature is kind of complex hence why very few libraries except KCL implement it. I will see if I can stamina the strength to work on this soonish but no promises, might even dumb it down a bit so it just drains the old shards on reshard event and then dies, would be far simpler.

Are you using the ON_DEMAND mode?

@chekan-o
Copy link
Contributor

Yes it suites our load profile quite well, and we got quite busy stream with spiky load. On demand works well with lambdas, but when we try it with long running service it would just go down for 24 hours, which is not acceptable in production.
We would definitely help to test it if needed.

@rogerducky
Copy link

Fully supporting shard split/merge logic requires some rework.
This post seems to explain it fairly well:
https://brandur.org/kinesis-by-example

To summarize the above:

In "maintenance" task: List all shards. Update checkpointer metadata based on shard ID at a regular interval.

We'd have to spawn a new task when a shard doesn't exist in checkpointer.

"per shard" task can have the states:

  • Pending (Do we have a parent/adjacent parent? If so, wait until they are closed before proceeding.)
  • Active (Set sequenceNumber to StartingSequenceNumber. Stay here until sequenceNumber matches EndingSequenceNumber)
  • Closed (SequenceNumber matches EndingSequenceNumber)

If we do it this way, then we can just have each task toss their records onto the pile until they're all closed.

Leaving this comment in case reading the Java code wasn't providing insights. May attempt a PR later on.

@rogerducky
Copy link

Okay. Realized why this project hasn't been active recently: AWS allowed lambdas to act as consumers, so those wanting to use Python can just use a lambda triggering on KDS events.

@hampsterx
Copy link
Owner

hampsterx commented Apr 14, 2022

This project was built for KDF running in container but I haven't needed to use it on recent projects so its languished a bit, the resharding support would be great though. If you can use Lambda (KDS) all the better just not every project is suitable for it.

It's equivalent to KCL (Java) or kafa-python :P

@rogerducky
Copy link

Understood. I was confused because most of the activity around packages similar to this one died around late 2019/early 2020. Then I found out AWS had hooked up lambda support on KDS. If you'd still find it useful, then I shall attempt to do it when I get a chance. ^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants