Skip to content

Commit

Permalink
Add currently running feature in cache (#379)
Browse files Browse the repository at this point in the history
* default, rate limit to 60

* add "current_running" in cache

* better logging

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* docs

* lint

* rename QUERY_WAIT_SECONDS

* docs

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
peterdudfield and pre-commit-ci[bot] authored Jan 14, 2025
1 parent ef7a11b commit e393198
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 14 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ You will need to set the following environmental variables:
been made in the last `FORECAST_ERROR_HOURS` hours
- `ADJUST_MW_LIMIT` - the maximum the api is allowed to adjust the national forecast by
- `FAKE` - This allows fake data to be used, rather than connecting to a database
- `QUERY_WAIT_SECONDS` - The number of seconds to wait for an on going query
- `CACHE_TIME_SECONDS` - The time in seconds to cache the data is used for
- `DELETE_CACHE_TIME_SECONDS` - The time in seconds to after which the cache is delete

Note you will need a database set up at `DB_URL`. This should use the datamodel in [nowcasting_datamodel](https://github.com/openclimatefix/nowcasting_datamodel)

Expand Down
82 changes: 68 additions & 14 deletions src/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
DELETE_CACHE_TIME_SECONDS = 240
delete_cache_time_seconds = int(os.getenv("DELETE_CACHE_TIME_SECONDS", DELETE_CACHE_TIME_SECONDS))

QUERY_WAIT_SECONDS = int(os.getenv("QUERY_WAIT_SECONDS", 10))


def remove_old_cache(
last_updated: dict, response: dict, remove_cache_time_seconds: float = delete_cache_time_seconds
Expand All @@ -29,7 +31,7 @@ def remove_old_cache(
:param remove_cache_time_seconds: the amount of time, after which the cache should be removed
"""
now = datetime.now(tz=timezone.utc)
logger.info("Removing old cache entries")
logger.info("Checking and removing old cache entries")
keys_to_remove = []
last_updated_copy = last_updated.copy()
for key, value in last_updated_copy.items():
Expand Down Expand Up @@ -69,11 +71,13 @@ async def example():
"""
response = {}
last_updated = {}
currently_running = {}

@wraps(func)
def wrapper(*args, **kwargs): # noqa
nonlocal response
nonlocal last_updated
nonlocal currently_running

# get the variables that go into the route
# we don't want to use the cache for different variables
Expand All @@ -95,27 +99,72 @@ def wrapper(*args, **kwargs): # noqa
# make route_variables into a string
route_variables = json.dumps(route_variables)

# check if its been called before
if route_variables not in last_updated:
logger.debug("First time this is route run")
last_updated[route_variables] = datetime.now(tz=timezone.utc)
# use case
# A. First time we call this the route -> call the route (1.1)
# B. Second time we call the route, but its running at the moment.
# Wait for it to finish. (1.0)
# C. The cached result it old, and its not running, --> call the route (1.2)
# D. The cached result is empty, and its running, --> (1.0)
# E. The cached result is up to date, --> use the cache (1.4)
# F. It is current being run, wait a bit, then try to use those results (1.0)
# G. If the cache results is None, lets wait a few seconds,
# then try to use the cache (1.3)

# 1.0
if currently_running.get(route_variables, False):
logger.debug("Route is being called somewhere else, so waiting for it to finish")
attempt = 0
while attempt < QUERY_WAIT_SECONDS:
logger.debug(f"waiting for route to finish, {attempt} seconds elapsed")
time.sleep(1)
attempt += 1
if not currently_running.get(route_variables, False):
logger.debug(
f"route finished after {attempt} seconds, returning cached response"
)
if route_variables in response:
return response[route_variables]
else:
logger.warning("route finished, but response not in cache")
break

# 1.1 check if its been called before and not currently running
if (route_variables not in last_updated) and (
not currently_running.get(route_variables, False)
):
logger.debug("First time this is route run, and not running now")

# run the route
currently_running[route_variables] = True
response[route_variables] = func(*args, **kwargs)
currently_running[route_variables] = False
last_updated[route_variables] = datetime.now(tz=timezone.utc)

return response[route_variables]

# re run if cache time out is up
# 1.2 rerun if cache time out is up and not currently running
now = datetime.now(tz=timezone.utc)
if now - timedelta(seconds=cache_time_seconds) > last_updated[route_variables]:
logger.debug(f"not using cache as longer than {cache_time_seconds} seconds")
if now - timedelta(seconds=cache_time_seconds) > last_updated[route_variables] and (
not currently_running.get(route_variables, False)
):
logger.debug(
f"Not using cache as longer than {cache_time_seconds} seconds, and not running now"
)

# run the route
currently_running[route_variables] = True
response[route_variables] = func(*args, **kwargs)
currently_running[route_variables] = False
last_updated[route_variables] = now

return response[route_variables]

# re-run if response is not cached for some reason or is empty
# 1.3. re-run if response is not cached for some reason or is empty
if route_variables not in response or response[route_variables] is None:
logger.debug("not using cache as response is empty")
attempt = 0
# wait until response has been cached
while attempt < 10:
while attempt < QUERY_WAIT_SECONDS:
logger.debug(f"waiting for response to be cached, {attempt} seconds elapsed")
time.sleep(1)
attempt += 1
Expand All @@ -124,14 +173,19 @@ def wrapper(*args, **kwargs): # noqa
f"response cached after {attempt} seconds, returning cached response"
)
break
if attempt >= 10:
# if response is not in cache after 10 seconds, re-run
logger.debug("response not cached after 10 seconds, re-running")
if attempt >= QUERY_WAIT_SECONDS:
# if response is not in cache after QUERY_WAIT_SECONDS seconds, re-run
logger.debug(f"response not cached after {QUERY_WAIT_SECONDS} seconds, re-running")

# run the route
currently_running[route_variables] = True
response[route_variables] = func(*args, **kwargs)
currently_running[route_variables] = False
last_updated[route_variables] = now

return response[route_variables]

# use cache
# 1.4 use cache
logger.debug(f"Using cache route, cache made at {last_updated[route_variables]}")
return response[route_variables]

Expand Down

0 comments on commit e393198

Please sign in to comment.