From 6b8da9bcd46e280007add42f38e29b06df76651d Mon Sep 17 00:00:00 2001 From: db0 Date: Sun, 29 Sep 2024 15:39:07 +0200 Subject: [PATCH] fix: Prevent one redis node going down causing degradation --- CHANGELOG.md | 4 + horde/apis/v2/base.py | 2 +- horde/classes/base/user.py | 2 +- horde/classes/base/waiting_prompt.py | 2 +- horde/classes/base/worker.py | 2 +- horde/classes/kobold/worker.py | 2 +- horde/classes/stable/interrogation.py | 2 +- horde/consts.py | 2 +- horde/countermeasures.py | 20 +-- horde/database/functions.py | 2 +- horde/database/text_functions.py | 2 +- horde/database/threads.py | 2 +- horde/detection.py | 6 +- horde/horde_redis.py | 242 +++++++++++++------------- horde/patreon.py | 2 +- horde/redis_ctrl.py | 15 +- 16 files changed, 158 insertions(+), 151 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c42900d5..e1086f6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ SPDX-License-Identifier: AGPL-3.0-or-later # Changelog +# 4.43.5 + +* Fix: Added check to ensure the redis servers are still available. + # 4.43.4 * Fix logic error when setting censored key diff --git a/horde/apis/v2/base.py b/horde/apis/v2/base.py index d4029c64..8caaa552 100644 --- a/horde/apis/v2/base.py +++ b/horde/apis/v2/base.py @@ -18,7 +18,6 @@ import horde.apis.limiter_api as lim import horde.classes.base.stats as stats from horde import exceptions as e -from horde import horde_redis as hr from horde.apis.models.v2 import Models, Parsers from horde.argparser import args from horde.classes.base import settings @@ -33,6 +32,7 @@ from horde.database import functions as database from horde.detection import prompt_checker from horde.flask import HORDE, cache, db +from horde.horde_redis import horde_redis as hr from horde.image import ensure_source_image_uploaded from horde.limiter import limiter from horde.logger import logger diff --git a/horde/classes/base/user.py b/horde/classes/base/user.py index 618eeb77..ee04f127 100644 --- a/horde/classes/base/user.py +++ b/horde/classes/base/user.py @@ -12,12 +12,12 @@ from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.ext.hybrid import hybrid_property -from horde import horde_redis as hr from horde import vars as hv from horde.countermeasures import CounterMeasures from horde.discord import send_problem_user_notification from horde.enums import UserRecordTypes, UserRoleTypes from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.patreon import patrons from horde.suspicions import SUSPICION_LOGS, Suspicions diff --git a/horde/classes/base/waiting_prompt.py b/horde/classes/base/waiting_prompt.py index 932e2159..834afe41 100644 --- a/horde/classes/base/waiting_prompt.py +++ b/horde/classes/base/waiting_prompt.py @@ -11,13 +11,13 @@ from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.sql import expression -from horde import horde_redis as hr from horde import vars as hv from horde.bridge_reference import check_bridge_capability from horde.classes.base.processing_generation import ProcessingGeneration from horde.classes.kobold.processing_generation import TextProcessingGeneration from horde.classes.stable.processing_generation import ImageProcessingGeneration from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.utils import get_db_uuid, get_expiry_date, get_extra_slow_expiry_date diff --git a/horde/classes/base/worker.py b/horde/classes/base/worker.py index 8b3585ee..a722fe86 100644 --- a/horde/classes/base/worker.py +++ b/horde/classes/base/worker.py @@ -9,11 +9,11 @@ from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.ext.hybrid import hybrid_property -from horde import horde_redis as hr from horde import vars as hv from horde.classes.base import settings from horde.discord import send_pause_notification from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.suspicions import SUSPICION_LOGS, Suspicions from horde.utils import get_db_uuid, is_profane, sanitize_string diff --git a/horde/classes/kobold/worker.py b/horde/classes/kobold/worker.py index c6e2d9c8..e30cdd9f 100644 --- a/horde/classes/kobold/worker.py +++ b/horde/classes/kobold/worker.py @@ -8,12 +8,12 @@ from sqlalchemy.dialects.postgresql import UUID from horde import exceptions as e -from horde import horde_redis as hr from horde.bridge_reference import ( is_backed_validated, ) from horde.classes.base.worker import Worker from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.model_reference import model_reference from horde.utils import sanitize_string diff --git a/horde/classes/stable/interrogation.py b/horde/classes/stable/interrogation.py index c0b4c083..62a84e9b 100644 --- a/horde/classes/stable/interrogation.py +++ b/horde/classes/stable/interrogation.py @@ -9,10 +9,10 @@ from sqlalchemy import JSON, Enum from sqlalchemy.dialects.postgresql import JSONB, UUID -from horde import horde_redis as hr from horde.consts import KNOWN_POST_PROCESSORS from horde.enums import State from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.r2 import generate_procgen_download_url, generate_procgen_upload_url from horde.utils import get_db_uuid, get_expiry_date, get_interrogation_form_expiry_date diff --git a/horde/consts.py b/horde/consts.py index a6eb02e7..a973dbb2 100644 --- a/horde/consts.py +++ b/horde/consts.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later -HORDE_VERSION = "4.43.4" +HORDE_VERSION = "4.43.5" WHITELISTED_SERVICE_IPS = { "212.227.227.178", # Turing Bot diff --git a/horde/countermeasures.py b/horde/countermeasures.py index 1fae3f91..0f596ea0 100644 --- a/horde/countermeasures.py +++ b/horde/countermeasures.py @@ -19,26 +19,16 @@ ) ip_r = None -logger.init("IP Address Cache", status="Connecting") -if is_redis_up(): - ip_r = get_ipaddr_db() - logger.init_ok("IP Address Cache", status="Connected") -else: - logger.init_err("IP Address Cache", status="Failed") ip_s_r = None -logger.init("IP Suspicion Cache", status="Connecting") -if is_redis_up(): - ip_s_r = get_ipaddr_suspicion_db() - logger.init_ok("IP Suspicion Cache", status="Connected") -else: - logger.init_err("IP Suspicion Cache", status="Failed") ip_t_r = None -logger.init("IP Timeout Cache", status="Connecting") +logger.init("IP Caches", status="Connecting") if is_redis_up(): + ip_r = get_ipaddr_db() + ip_s_r = get_ipaddr_suspicion_db() ip_t_r = get_ipaddr_timeout_db() - logger.init_ok("IP Timeout Cache", status="Connected") + logger.init_ok("IP Caches", status="Connected") else: - logger.init_err("IP Timeout Cache", status="Failed") + logger.init_err("IP Caches", status="Failed") test_timeout = 0 diff --git a/horde/database/functions.py b/horde/database/functions.py index 423770d2..cf445735 100644 --- a/horde/database/functions.py +++ b/horde/database/functions.py @@ -13,7 +13,6 @@ from sqlalchemy.orm import noload import horde.classes.base.stats as stats -from horde import horde_redis as hr from horde import vars as hv from horde.bridge_reference import ( check_bridge_capability, @@ -34,6 +33,7 @@ from horde.database.classes import FakeWPRow from horde.enums import State from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.model_reference import model_reference from horde.utils import hash_api_key, validate_regex diff --git a/horde/database/text_functions.py b/horde/database/text_functions.py index 7865f16b..38d0f9be 100644 --- a/horde/database/text_functions.py +++ b/horde/database/text_functions.py @@ -10,7 +10,6 @@ from sqlalchemy.orm import noload import horde.classes.base.stats as stats -from horde import horde_redis as hr from horde.bridge_reference import ( is_backed_validated, ) @@ -22,6 +21,7 @@ from horde.classes.kobold.waiting_prompt import TextWaitingPrompt from horde.database.functions import query_prioritized_wps from horde.flask import SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.model_reference import model_reference diff --git a/horde/database/threads.py b/horde/database/threads.py index 38847c13..b01c8fde 100644 --- a/horde/database/threads.py +++ b/horde/database/threads.py @@ -9,7 +9,6 @@ import patreon from sqlalchemy import func, or_ -from horde import horde_redis as hr from horde.argparser import args from horde.classes.base.user import User from horde.classes.kobold.processing_generation import TextProcessingGeneration @@ -30,6 +29,7 @@ ) from horde.enums import State from horde.flask import HORDE, SQLITE_MODE, db +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.patreon import patrons from horde.r2 import delete_source_image diff --git a/horde/detection.py b/horde/detection.py index 5f78e971..f9025469 100644 --- a/horde/detection.py +++ b/horde/detection.py @@ -13,7 +13,7 @@ from horde.argparser import args from horde.database.functions import compile_regex_filter, retrieve_regex_replacements from horde.flask import HORDE, SQLITE_MODE # Local Testing -from horde.horde_redis import horde_r_get +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.model_reference import model_reference @@ -82,7 +82,7 @@ def refresh_regex(self): with HORDE.app_context(): stored_replacements = retrieve_regex_replacements(filter_type=10) else: - cached_replacements = horde_r_get("cached_regex_replacements") + cached_replacements = hr.horde_r_get("cached_regex_replacements") if not cached_replacements: logger.warning("No cached regex replacements found in redis! Check threads!") stored_replacements = [] @@ -97,7 +97,7 @@ def refresh_regex(self): with HORDE.app_context(): stored_filter = compile_regex_filter(_id) else: - stored_filter = horde_r_get(filter_id) + stored_filter = hr.horde_r_get(filter_id) # Ensure we don't get catch-all regex if not stored_filter: continue diff --git a/horde/horde_redis.py b/horde/horde_redis.py index 636ede9c..3d1ddf3a 100644 --- a/horde/horde_redis.py +++ b/horde/horde_redis.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import json +import threading +import time from datetime import timedelta from threading import Lock @@ -15,121 +17,125 @@ is_redis_up, ) -locks = {} - -horde_r = None -all_horde_redis = [] -logger.init("Horde Redis", status="Connecting") -if is_redis_up(): - horde_r = get_horde_db() - all_horde_redis = get_all_redis_db_servers() - logger.init_ok("Horde Redis", status="Connected") -else: - logger.init_err("Horde Redis", status="Failed") - - -horde_local_r = None -logger.init("Horde Local Redis", status="Connecting") -if is_local_redis_up(): - horde_local_r = get_local_horde_db() - logger.init_ok("Horde Local Redis", status="Connected") -else: - logger.init_err("Horde Local Redis", status="Failed") - - -def horde_r_set(key, value): - for hr in all_horde_redis: - try: - hr.set(key, value) - except Exception as err: - logger.warning(f"Exception when writing in redis servers {hr}: {err}") - if horde_local_r: - horde_local_r.setex(key, timedelta(10), value) - - -def horde_r_setex(key, expiry, value): - for hr in all_horde_redis: - try: - hr.setex(key, expiry, value) - except Exception as err: - logger.warning(f"Exception when writing in redis servers {hr}: {err}") - # We don't keep local cache for more than 5 seconds - if expiry > timedelta(5): - expiry = timedelta(5) - if horde_local_r: - horde_local_r.setex(key, expiry, value) - - -def horde_r_setex_json(key, expiry, value): - """Same as horde_r_setex() - but also converts the python builtin value to json - """ - horde_r_setex(key, expiry, json.dumps(value)) - - -def horde_r_local_set_to_json(key, value): - if horde_local_r: - if key not in locks: - locks[key] = Lock() - locks[key].acquire() - try: - horde_local_r.set(key, json.dumps(value)) - except Exception as err: - logger.error(f"Something went wrong when setting local redis: {err}") - locks[key].release() - - -def horde_local_setex_to_json(key, seconds, value): - if horde_local_r: - if key not in locks: - locks[key] = Lock() - locks[key].acquire() - try: - horde_local_r.setex(key, timedelta(seconds=seconds), json.dumps(value)) - except Exception as err: - logger.error(f"Something went wrong when setting local redis: {err}") - locks[key].release() - - -def horde_r_get(key): - """Retrieves the value from local redis if it exists - If it doesn't exist retrieves it from remote redis - If it exists in remote redis, also stores it in local redis - """ - value = None - if horde_local_r: - # if key in ["worker_cache","worker_cache_privileged"]: - # logger.warning(f"Got {key} from Local") - value = horde_local_r.get(key) - if value is None and horde_r: - value = horde_r.get(key) - if value is not None and horde_local_r is not None: - ttl = horde_r.ttl(key) - if ttl > 5: - ttl = 5 - if ttl <= 0: - ttl = 2 - # The local redis cache is always very temporary - if value is not None: - horde_local_r.setex(key, timedelta(seconds=abs(ttl)), value) - return value - - -def horde_r_get_json(key): - """Same as horde_r_get() - but also converts the json to python built-ins - """ - value = horde_r_get(key) - if value is None: - return None - return json.loads(value) - - -def horde_r_delete(key): - for hr in all_horde_redis: - try: - hr.delete(key) - except Exception as err: - logger.warning(f"Exception when deleting from redis servers {hr}: {err}") - if horde_local_r: - horde_local_r.delete(key) + +class HordeRedis: + locks = {} + horde_r = None + all_horde_redis = [] + horde_local_r = None + check_redis_thread = None + + def __init__(self): + logger.init("Horde Redis", status="Connecting") + if is_redis_up(): + self.horde_r = get_horde_db() + self.all_horde_redis = get_all_redis_db_servers() + logger.init_ok("Horde Redis", status="Connected") + else: + logger.init_err("Horde Redis", status="Failed") + logger.init("Horde Local Redis", status="Connecting") + if is_local_redis_up(): + self.horde_local_r = get_local_horde_db() + logger.init_ok("Horde Local Redis", status="Connected") + else: + logger.init_err("Horde Local Redis", status="Failed") + self.check_redis_thread = threading.Thread(target=self.check_redis_backends, args=(), daemon=True) + self.check_redis_thread.start() + + def check_redis_backends(self): + while True: + time.sleep(10) + self.all_horde_redis = get_all_redis_db_servers() + + def horde_r_set(self, key, value): + for hr in self.all_horde_redis: + try: + hr.set(key, value) + except Exception as err: + logger.warning(f"Exception when writing in redis servers {hr}: {err}") + if self.horde_local_r: + self.horde_local_r.setex(key, timedelta(10), value) + + def horde_r_setex(self, key, expiry, value): + for hr in self.all_horde_redis: + try: + hr.setex(key, expiry, value) + except Exception as err: + logger.warning(f"Exception when writing in redis servers {hr}: {err}") + # We don't keep local cache for more than 5 seconds + if expiry > timedelta(5): + expiry = timedelta(5) + if self.horde_local_r: + self.horde_local_r.setex(key, expiry, value) + + def horde_r_setex_json(self, key, expiry, value): + """Same as horde_r_setex() + but also converts the python builtin value to json + """ + self.horde_r_setex(key, expiry, json.dumps(value)) + + def horde_r_local_set_to_json(self, key, value): + if self.horde_local_r: + if key not in self.locks: + self.locks[key] = Lock() + self.locks[key].acquire() + try: + self.horde_local_r.set(key, json.dumps(value)) + except Exception as err: + logger.error(f"Something went wrong when setting local redis: {err}") + self.locks[key].release() + + def horde_local_setex_to_json(self, key, seconds, value): + if self.horde_local_r: + if key not in self.locks: + self.locks[key] = Lock() + self.locks[key].acquire() + try: + self.horde_local_r.setex(key, timedelta(seconds=seconds), json.dumps(value)) + except Exception as err: + logger.error(f"Something went wrong when setting local redis: {err}") + self.locks[key].release() + + def horde_r_get(self, key): + """Retrieves the value from local redis if it exists + If it doesn't exist retrieves it from remote redis + If it exists in remote redis, also stores it in local redis + """ + value = None + if self.horde_local_r: + # if key in ["worker_cache","worker_cache_privileged"]: + # logger.warning(f"Got {key} from Local") + value = self.horde_local_r.get(key) + if value is None and self.horde_r: + value = self.horde_r.get(key) + if value is not None and self.horde_local_r is not None: + ttl = self.horde_r.ttl(key) + if ttl > 5: + ttl = 5 + if ttl <= 0: + ttl = 2 + # The local redis cache is always very temporary + if value is not None: + self.horde_local_r.setex(key, timedelta(seconds=abs(ttl)), value) + return value + + def horde_r_get_json(self, key): + """Same as horde_r_get() + but also converts the json to python built-ins + """ + value = self.horde_r_get(key) + if value is None: + return None + return json.loads(value) + + def horde_r_delete(self, key): + for hr in self.all_horde_redis: + try: + hr.delete(key) + except Exception as err: + logger.warning(f"Exception when deleting from redis servers {hr}: {err}") + if self.horde_local_r: + self.horde_local_r.delete(key) + + +horde_redis = HordeRedis() diff --git a/horde/patreon.py b/horde/patreon.py index aa6ab02d..cc93f770 100644 --- a/horde/patreon.py +++ b/horde/patreon.py @@ -4,7 +4,7 @@ import json -from horde import horde_redis as hr +from horde.horde_redis import horde_redis as hr from horde.logger import logger from horde.threads import PrimaryTimedFunction diff --git a/horde/redis_ctrl.py b/horde/redis_ctrl.py index 8f6ef023..e684dcd0 100644 --- a/horde/redis_ctrl.py +++ b/horde/redis_ctrl.py @@ -22,15 +22,16 @@ ipaddr_timeout_db = 5 -def is_redis_up() -> bool: +def is_redis_up(hostname=redis_hostname, port=redis_port) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(3) try: - return s.connect_ex((redis_hostname, redis_port)) == 0 + return s.connect_ex((hostname, port)) == 0 except socket.gaierror as e: # connect_ex suppresses exceptions from POSIX connect() call # but can still raise gaierror if e.g. the hostname is invalid. # This may be transient, so log the error and return False. - logger.error(f"Redis server at {redis_hostname}:{redis_port} is not reachable: {e}") + logger.error(f"Redis server at {hostname}:{port} is not reachable: {e}") return False @@ -77,7 +78,13 @@ def get_all_redis_db_servers(): This allows redis to transparently failover. """ try: - return [get_redis_db_server(rs) for rs in json.loads(os.getenv("REDIS_SERVERS"))] + working_redis = [] + for rs in json.loads(os.getenv("REDIS_SERVERS")): + if is_redis_up(rs): + working_redis.append(get_redis_db_server(rs)) + else: + logger.warning(f"redis server '{rs} appears unreachable. Will not be used set in the cluster") + return working_redis except Exception: logger.error("Error setting up REDIS_SERVERS array. Falling back to loadbalancer.") return [get_horde_db()]