Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confirm NextShardIterator key exist to fetch from existing consumer shard #26

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
94 changes: 80 additions & 14 deletions kinesis/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio

from abc import abstractmethod
import aiobotocore
import logging
from async_timeout import timeout
Expand All @@ -14,16 +16,17 @@

class Base:
def __init__(
self,
stream_name,
endpoint_url=None,
region_name=None,
retry_limit=None,
expo_backoff=None,
expo_backoff_limit=120,
skip_describe_stream=False,
create_stream=False,
create_stream_shards=1,
self,
stream_name,
endpoint_url=None,
region_name=None,
retry_limit=None,
expo_backoff=None,
expo_backoff_limit=120,
skip_describe_stream=False,
create_stream=False,
create_stream_shards=1,
shard_refresh_timer=(60 * 15)
):

self.stream_name = stream_name
Expand All @@ -32,7 +35,7 @@ def __init__(
self.region_name = region_name

self.client = None
self.shards = None
self.shards = []

self.stream_status = None

Expand All @@ -43,16 +46,25 @@ def __init__(
# connection states of kinesis client
self.RECONNECT = "RECONNECT"
self.ACTIVE = "ACTIVE"
self.WAIT = "WAIT"
self.INITIALIZE = "INITIALIZE"
# state of local self.shards when compared to kinesis stream shards
self.SYNCED = "SYNCED"
self.RESYNC = "RESYNC"

self.stream_status = self.INITIALIZE
self.shards_status = self.WAIT
# Short Lived producer might want to skip describing stream on startup
self.skip_describe_stream = skip_describe_stream
self._conn_lock = asyncio.Lock()
self._reconnect_timeout = time.monotonic()
self.create_stream = create_stream
self.create_stream_shards = create_stream_shards

self._shards_lock = asyncio.Lock()
self.shard_refresh_monotonic = time.monotonic()
self.shard_refresh_timer = shard_refresh_timer

async def __aenter__(self):

log.info(
Expand Down Expand Up @@ -120,7 +132,6 @@ async def start(self):
self.stream_name
)
)
self.shards = []

log.debug("Checking stream '{}' is active".format(self.stream_name))

Expand All @@ -132,6 +143,7 @@ async def start(self):

if stream_status == self.ACTIVE:
self.stream_status = stream_status
self.shards_status = self.INITIALIZE
break

if stream_status in ["CREATING", "UPDATING"]:
Expand Down Expand Up @@ -175,8 +187,8 @@ async def get_conn(self):
log.warning(f"Connection Failed to Initialize : {e.__class__} {e}")
await self._get_reconn_helper()
elif (
self.stream_status == self.ACTIVE
and (time.monotonic() - self._reconnect_timeout) > 120
self.stream_status == self.ACTIVE
and (time.monotonic() - self._reconnect_timeout) > 120
):
# reconnect_timeout is a Lock so a new connection is not created immediately
# after a successfully reconnection has been made since self.start() sets self.stream_status = "ACTIVE"
Expand Down Expand Up @@ -250,3 +262,57 @@ async def _create_stream(self, ignore_exists=True):
)
else:
raise

def set_shard_sync_state(self):
subclass_type = type(self).__name__
self.shards_status = self.SYNCED
self.shard_refresh_monotonic = time.monotonic()
self._shards_lock.release()
log.info("{}: Shard count now at {}".format(subclass_type, self.shards))

@abstractmethod
async def _spilt_shards(self, shards):
# Handles Spilt Shard events
# https://brandur.org/kinesis-by-example
pass

@abstractmethod
async def _merge_shards(self, shards):
# Handles Merge Shard events
# https://brandur.org/kinesis-by-example
pass

async def sync_shards(self):

subclass_type = type(self).__name__

if self.shards_status == self.INITIALIZE:
stream_info = await self.get_stream_description()
await self._shards_lock.acquire()
await self._spilt_shards(stream_info['Shards'])
self.set_shard_sync_state()

# check if it's time for a RESYNC
elif (time.monotonic() - self.shard_refresh_monotonic) > self.shard_refresh_timer \
and self.shards_status == self.SYNCED:

stream_info = await self.get_stream_description()
if stream_info["StreamStatus"] == 'UPDATING' or stream_info["StreamStatus"] == 'CREATING':
pass

else:
await self._shards_lock.acquire()
self.shards_status = self.RESYNC
stream_shards = stream_info['Shards']
if len(stream_info['Shards']) == len(self.shards):
log.debug(
"{}: Stream {} has all shards ids in sync with kinesis".format(
subclass_type, self.stream_name
)
)
elif len(stream_info['Shards']) > len(self.shards):
await self._spilt_shards(stream_shards)

elif len(stream_info['Shards']) < len(self.shards):
await self._merge_shards(stream_shards)
self.set_shard_sync_state()
3 changes: 2 additions & 1 deletion kinesis/checkpointers.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ async def _checkpoint(self, shard_id, sequence):
)
)

if previous_val["ref"] != self.get_ref():
# Could be None due to a Reshard event
if previous_val["ref"] != self.get_ref() and previous_val["ref"] is not None:
raise NotImplementedError(
"{} checkpointed on {} but ref is different {}".format(
self.get_ref(), shard_id, val["ref"]
Expand Down
Loading