diff --git a/CHANGELOG.md b/CHANGELOG.md index b720d3ae..2ef11f4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ SPDX-License-Identifier: AGPL-3.0-or-later # Changelog +# 4.43.0 + +* Adjused TTL formula to be algorithmic +* prevent workers without flux support picking up flux jobs +* Adds `extra_slow_workers` bool for image gen async +* Adds `extra_slow_worker` bool for worker pop +* Adds `limit_max_steps` for worker pop + # 4.42.0 * Adds support for the Flux family of models diff --git a/horde/apis/models/kobold_v2.py b/horde/apis/models/kobold_v2.py index 944773a0..30a13062 100644 --- a/horde/apis/models/kobold_v2.py +++ b/horde/apis/models/kobold_v2.py @@ -348,6 +348,12 @@ def __init__(self, api): "The request will include the details of the job as well as the request ID." ), ), + "extra_slow_workers": fields.Boolean( + default=False, + description=( + "When True, allows very slower workers to pick up this request. " "Use this when you don't mind waiting a lot." + ), + ), }, ) self.response_model_contrib_details = api.inherit( diff --git a/horde/apis/models/stable_v2.py b/horde/apis/models/stable_v2.py index 50620ed7..cb101054 100644 --- a/horde/apis/models/stable_v2.py +++ b/horde/apis/models/stable_v2.py @@ -152,6 +152,14 @@ def __init__(self): help="If True, this worker will pick up requests requesting LoRas.", location="json", ) + self.job_pop_parser.add_argument( + "limit_max_steps", + type=bool, + required=False, + default=False, + help="If True, This worker will not pick up jobs with more steps than the average allowed for that model.", + location="json", + ) self.job_submit_parser.add_argument( "seed", type=int, @@ -451,6 +459,9 @@ def __init__(self, api): "max_pixels": fields.Integer( description="How many waiting requests were skipped because they demanded a higher size than this worker provides.", ), + "step_count": fields.Integer( + description="How many waiting requests were skipped because they demanded a higher step count that the worker wants.", + ), "unsafe_ip": fields.Integer( description="How many waiting requests were skipped because they came from an unsafe IP.", ), @@ -544,6 +555,13 @@ def __init__(self, api): default=True, description="If True, this worker will pick up requests requesting LoRas.", ), + "limit_max_steps": fields.Boolean( + default=True, + description=( + "If True, This worker will not pick up jobs with more steps than the average allowed for that model." + " this is for use by workers which might run into issues doing too many steps." + ), + ), }, ) self.input_model_job_submit = api.inherit( @@ -591,6 +609,12 @@ def __init__(self, api): default=True, description="When True, allows slower workers to pick up this request. Disabling this incurs an extra kudos cost.", ), + "extra_slow_workers": fields.Boolean( + default=False, + description=( + "When True, allows very slower workers to pick up this request. " "Use this when you don't mind waiting a lot." + ), + ), "censor_nsfw": fields.Boolean( default=False, description="If the request is SFW, and the worker accidentally generates NSFW, it will send back a censored image.", diff --git a/horde/apis/models/v2.py b/horde/apis/models/v2.py index 6e1e1079..863c03bb 100644 --- a/horde/apis/models/v2.py +++ b/horde/apis/models/v2.py @@ -101,6 +101,14 @@ def __init__(self): help="When True, allows slower workers to pick up this request. Disabling this incurs an extra kudos cost.", location="json", ) + self.generate_parser.add_argument( + "extra_slow_workers", + type=bool, + default=False, + required=False, + help="When True, allows very slower workers to pick up this request. Use this when you don't mind waiting a lot.", + location="json", + ) self.generate_parser.add_argument( "dry_run", type=bool, @@ -204,6 +212,13 @@ def __init__(self): help="How many jobvs to pop at the same time", location="json", ) + self.job_pop_parser.add_argument( + "extra_slow_worker", + type=bool, + default=False, + required=False, + location="json", + ) self.job_submit_parser = reqparse.RequestParser() self.job_submit_parser.add_argument( @@ -537,6 +552,13 @@ def __init__(self, api): min=1, max=20, ), + "extra_slow_worker": fields.Boolean( + default=True, + description=( + "If True, marks the worker as very slow. You should only use this if your mps/s is lower than 0.1." + "Extra slow workers are excluded from normal requests but users can opt in to use them." + ), + ), }, ) self.response_model_worker_details = api.inherit( diff --git a/horde/apis/v2/base.py b/horde/apis/v2/base.py index 5278e2ed..0666b35f 100644 --- a/horde/apis/v2/base.py +++ b/horde/apis/v2/base.py @@ -456,7 +456,6 @@ def post(self): # as they're typically countermeasures to raids if skipped_reason != "secret": self.skipped[skipped_reason] = self.skipped.get(skipped_reason, 0) + 1 - # logger.warning(datetime.utcnow()) continue # There is a chance that by the time we finished all the checks, another worker picked up the WP. @@ -477,7 +476,7 @@ def post(self): # We report maintenance exception only if we couldn't find any jobs if self.worker.maintenance: raise e.WorkerMaintenance(self.worker.maintenance_msg) - # logger.warning(datetime.utcnow()) + # logger.debug(self.skipped) return {"id": None, "ids": [], "skipped": self.skipped}, 200 def get_sorted_wp(self, priority_user_ids=None): diff --git a/horde/apis/v2/stable.py b/horde/apis/v2/stable.py index 26f6f4f5..cafba323 100644 --- a/horde/apis/v2/stable.py +++ b/horde/apis/v2/stable.py @@ -300,6 +300,7 @@ def initiate_waiting_prompt(self): validated_backends=self.args.validated_backends, worker_blacklist=self.args.worker_blacklist, slow_workers=self.args.slow_workers, + extra_slow_workers=self.args.extra_slow_workers, source_processing=self.args.source_processing, ipaddr=self.user_ip, safe_ip=self.safe_ip, @@ -599,6 +600,10 @@ def post(self): db_skipped["kudos"] = post_ret["skipped"]["kudos"] if "blacklist" in post_ret.get("skipped", {}): db_skipped["blacklist"] = post_ret["skipped"]["blacklist"] + if "step_count" in post_ret.get("skipped", {}): + db_skipped["step_count"] = post_ret["skipped"]["step_count"] + if "bridge_version" in post_ret.get("skipped", {}): + db_skipped["bridge_version"] = db_skipped.get("bridge_version", 0) + post_ret["skipped"]["bridge_version"] post_ret["skipped"] = db_skipped # logger.debug(post_ret) return post_ret, retcode @@ -621,6 +626,8 @@ def check_in(self): allow_controlnet=self.args.allow_controlnet, allow_sdxl_controlnet=self.args.allow_sdxl_controlnet, allow_lora=self.args.allow_lora, + extra_slow_worker=self.args.extra_slow_worker, + limit_max_steps=self.args.limit_max_steps, priority_usernames=self.priority_usernames, ) diff --git a/horde/bridge_reference.py b/horde/bridge_reference.py index 7d7a1aea..68eac7bf 100644 --- a/horde/bridge_reference.py +++ b/horde/bridge_reference.py @@ -9,6 +9,7 @@ BRIDGE_CAPABILITIES = { "AI Horde Worker reGen": { + 9: {"flux"}, 8: {"layer_diffuse"}, 7: {"qr_code", "extra_texts", "workflow"}, 6: {"stable_cascade_2pass"}, @@ -185,6 +186,7 @@ def parse_bridge_agent(bridge_agent): @logger.catch(reraise=True) def check_bridge_capability(capability, bridge_agent): bridge_name, bridge_version = parse_bridge_agent(bridge_agent) + # logger.debug([bridge_name, bridge_version]) if bridge_name not in BRIDGE_CAPABILITIES: return False total_capabilities = set() @@ -194,6 +196,7 @@ def check_bridge_capability(capability, bridge_agent): if checked_semver.compare(bridge_version) <= 0: total_capabilities.update(BRIDGE_CAPABILITIES[bridge_name][version]) # logger.debug([total_capabilities, capability, capability in total_capabilities]) + # logger.debug([bridge_name, BRIDGE_CAPABILITIES[bridge_name]]) return capability in total_capabilities diff --git a/horde/classes/base/processing_generation.py b/horde/classes/base/processing_generation.py index febdd009..9a95f836 100644 --- a/horde/classes/base/processing_generation.py +++ b/horde/classes/base/processing_generation.py @@ -44,6 +44,7 @@ class ProcessingGeneration(db.Model): nullable=False, server_default=expression.literal(False), ) + job_ttl = db.Column(db.Integer, default=150, nullable=False, index=True) wp_id = db.Column( uuid_column_type(), @@ -80,6 +81,7 @@ def __init__(self, *args, **kwargs): self.model = matching_models[0] else: self.model = kwargs["model"] + self.set_job_ttl() db.session.commit() def set_generation(self, generation, things_per_sec, **kwargs): @@ -163,10 +165,10 @@ def is_completed(self): def is_faulted(self): return self.faulted - def is_stale(self, ttl): + def is_stale(self): if self.is_completed() or self.is_faulted(): return False - return (datetime.utcnow() - self.start_time).total_seconds() > ttl + return (datetime.utcnow() - self.start_time).total_seconds() > self.job_ttl def delete(self): db.session.delete(self) @@ -224,3 +226,10 @@ def send_webhook(self, kudos): break except Exception as err: logger.debug(f"Exception when sending generation webhook: {err}. Will retry {3-riter-1} more times...") + + def set_job_ttl(self): + """Returns how many seconds each job request should stay waiting before considering it stale and cancelling it + This function should be overriden by the invididual hordes depending on how the calculating ttl + """ + self.job_ttl = 150 + db.session.commit() diff --git a/horde/classes/base/waiting_prompt.py b/horde/classes/base/waiting_prompt.py index 0a8c26cf..932e2159 100644 --- a/horde/classes/base/waiting_prompt.py +++ b/horde/classes/base/waiting_prompt.py @@ -19,7 +19,7 @@ from horde.classes.stable.processing_generation import ImageProcessingGeneration from horde.flask import SQLITE_MODE, db from horde.logger import logger -from horde.utils import get_db_uuid, get_expiry_date +from horde.utils import get_db_uuid, get_expiry_date, get_extra_slow_expiry_date procgen_classes = { "template": ProcessingGeneration, @@ -93,6 +93,7 @@ class WaitingPrompt(db.Model): trusted_workers = db.Column(db.Boolean, default=False, nullable=False, index=True) validated_backends = db.Column(db.Boolean, default=True, nullable=False, index=True) slow_workers = db.Column(db.Boolean, default=True, nullable=False, index=True) + extra_slow_workers = db.Column(db.Boolean, default=False, nullable=False, index=True) worker_blacklist = db.Column(db.Boolean, default=False, nullable=False, index=True) faulted = db.Column(db.Boolean, default=False, nullable=False, index=True) active = db.Column(db.Boolean, default=False, nullable=False, index=True) @@ -105,6 +106,7 @@ class WaitingPrompt(db.Model): things = db.Column(db.BigInteger, default=0, nullable=False) total_usage = db.Column(db.Float, default=0, nullable=False) extra_priority = db.Column(db.Integer, default=0, nullable=False, index=True) + # TODO: Delete. Obsoleted. job_ttl = db.Column(db.Integer, default=150, nullable=False) disable_batching = db.Column(db.Boolean, default=False, nullable=False) webhook = db.Column(db.String(1024)) @@ -204,7 +206,6 @@ def extract_params(self): self.things = 0 self.total_usage = round(self.things * self.n, 2) self.prepare_job_payload() - self.set_job_ttl() db.session.commit() def prepare_job_payload(self): @@ -241,7 +242,7 @@ def start_generation(self, worker, amount=1): self.n -= safe_amount payload = self.get_job_payload(current_n) # This does a commit as well - self.refresh() + self.refresh(worker) procgen_class = procgen_classes[self.wp_type] gens_list = [] model = None @@ -457,8 +458,13 @@ def abort_for_maintenance(self): except Exception as err: logger.warning(f"Error when aborting WP. Skipping: {err}") - def refresh(self): - self.expiry = get_expiry_date() + def refresh(self, worker=None): + if worker is not None and worker.extra_slow_worker is True: + self.expiry = get_extra_slow_expiry_date() + else: + new_expiry = get_expiry_date() + if self.expiry < new_expiry: + self.expiry = new_expiry db.session.commit() def is_stale(self): @@ -469,13 +475,6 @@ def is_stale(self): def get_priority(self): return self.extra_priority - def set_job_ttl(self): - """Returns how many seconds each job request should stay waiting before considering it stale and cancelling it - This function should be overriden by the invididual hordes depending on how the calculating ttl - """ - self.job_ttl = 150 - db.session.commit() - def refresh_worker_cache(self): worker_ids = [worker.worker_id for worker in self.workers] worker_string_ids = [str(worker.worker_id) for worker in self.workers] diff --git a/horde/classes/base/worker.py b/horde/classes/base/worker.py index aecd3669..8b3585ee 100644 --- a/horde/classes/base/worker.py +++ b/horde/classes/base/worker.py @@ -121,6 +121,7 @@ class WorkerTemplate(db.Model): # Used by all workers to record how much they can pick up to generate # The value of this column is dfferent per worker type max_power = db.Column(db.Integer, default=20, nullable=False) + extra_slow_worker = db.Column(db.Boolean, default=False, nullable=False, index=True) paused = db.Column(db.Boolean, default=False, nullable=False) maintenance = db.Column(db.Boolean, default=False, nullable=False) @@ -196,7 +197,7 @@ def report_suspicion(self, amount=1, reason=Suspicions.WORKER_PROFANITY, formats f"Last suspicion log: {reason.name}.\n" f"Total Suspicion {self.get_suspicion()}", ) - db.session.commit() + db.session.flush() def get_suspicion_reasons(self): return set([s.suspicion_id for s in self.suspicions]) @@ -261,10 +262,6 @@ def toggle_paused(self, is_paused_active): # This should be extended by each worker type def check_in(self, **kwargs): - # To avoid excessive commits, - # we only record new changes on the worker every 30 seconds - if (datetime.utcnow() - self.last_check_in).total_seconds() < 30 and (datetime.utcnow() - self.created).total_seconds() > 30: - return self.ipaddr = kwargs.get("ipaddr", None) self.bridge_agent = sanitize_string(kwargs.get("bridge_agent", "unknown:0:unknown")) self.threads = kwargs.get("threads", 1) @@ -275,6 +272,10 @@ def check_in(self, **kwargs): self.prioritized_users = kwargs.get("prioritized_users", []) if not kwargs.get("safe_ip", True) and not self.user.trusted: self.report_suspicion(reason=Suspicions.UNSAFE_IP) + # To avoid excessive commits, + # we only record new uptime on the worker every 30 seconds + if (datetime.utcnow() - self.last_check_in).total_seconds() < 30 and (datetime.utcnow() - self.created).total_seconds() > 30: + return if not self.is_stale() and not self.paused and not self.maintenance: self.uptime += (datetime.utcnow() - self.last_check_in).total_seconds() # Every 10 minutes of uptime gets 100 kudos rewarded @@ -293,7 +294,6 @@ def check_in(self, **kwargs): # So that they have to stay up at least 10 mins to get uptime kudos self.last_reward_uptime = self.uptime self.last_check_in = datetime.utcnow() - db.session.commit() def get_human_readable_uptime(self): if self.uptime < 60: @@ -511,7 +511,8 @@ def check_in(self, **kwargs): self.set_models(kwargs.get("models")) self.nsfw = kwargs.get("nsfw", True) self.set_blacklist(kwargs.get("blacklist", [])) - db.session.commit() + self.extra_slow_worker = kwargs.get("extra_slow_worker", False) + # Commit should happen on calling extensions def set_blacklist(self, blacklist): # We don't allow more workers to claim they can server more than 50 models atm (to prevent abuse) @@ -527,7 +528,7 @@ def set_blacklist(self, blacklist): for word in blacklist: blacklisted_word = WorkerBlackList(worker_id=self.id, word=word[0:15]) db.session.add(blacklisted_word) - db.session.commit() + db.session.flush() def refresh_model_cache(self): models_list = [m.model for m in self.models] @@ -563,7 +564,7 @@ def set_models(self, models): return # logger.debug([existing_model_names,models, existing_model_names == models]) db.session.query(WorkerModel).filter_by(worker_id=self.id).delete() - db.session.commit() + db.session.flush() for model_name in models: model = WorkerModel(worker_id=self.id, model=model_name) db.session.add(model) diff --git a/horde/classes/kobold/worker.py b/horde/classes/kobold/worker.py index f9ce7a49..c6e2d9c8 100644 --- a/horde/classes/kobold/worker.py +++ b/horde/classes/kobold/worker.py @@ -49,7 +49,7 @@ def check_in(self, max_length, max_context_length, softprompts, **kwargs): super().check_in(**kwargs) self.max_length = max_length self.max_context_length = max_context_length - self.set_softprompts(softprompts) + self.set_softprompts(softprompts) # Does a commit as well paused_string = "" if self.paused: paused_string = "(Paused) " @@ -57,6 +57,7 @@ def check_in(self, max_length, max_context_length, softprompts, **kwargs): f"{paused_string}Text Worker {self.name} checked-in, offering models {self.models} " f"at {self.max_length} max tokens and {self.max_context_length} max content length.", ) + db.session.commit() def refresh_softprompt_cache(self): softprompts_list = [s.softprompt for s in self.softprompts] @@ -100,11 +101,10 @@ def set_softprompts(self, softprompts): ], ) db.session.query(TextWorkerSoftprompts).filter_by(worker_id=self.id).delete() - db.session.commit() + db.session.flush() for softprompt_name in softprompts: softprompt = TextWorkerSoftprompts(worker_id=self.id, softprompt=softprompt_name) db.session.add(softprompt) - db.session.commit() self.refresh_softprompt_cache() def calculate_uptime_reward(self): diff --git a/horde/classes/stable/processing_generation.py b/horde/classes/stable/processing_generation.py index d4a984ba..87808d80 100644 --- a/horde/classes/stable/processing_generation.py +++ b/horde/classes/stable/processing_generation.py @@ -143,3 +143,22 @@ def upload_generation_metadata(self): f.write(json_object) upload_shared_metadata(filename) os.remove(filename) + + def set_job_ttl(self): + # We are aiming here for a graceful min 2sec/it speed on workers for 512x512 which is well below our requested min 0.5mps/s, + # to buffer for model loading and allow for the occasional slowdown without dropping jobs. + # There is also a minimum of 2mins, regardless of steps and resolution used and an extra 30 seconds for model loading. + # This means a worker at 1mps/s should be able to finish a 512x512x50 request comfortably within 30s but we allow up to 2.5mins. + # This number then increases lineary based on the resolution requested. + # Using this formula, a 1536x768x40 request is expected to take ~50s on a 1mps/s worker, but we will only time out after 390s. + ttl_multiplier = (self.wp.width * self.wp.height) / (512 * 512) + self.job_ttl = 30 + (self.wp.get_accurate_steps() * 2 * ttl_multiplier) + # CN is 3 times slower + if self.wp.gen_payload.get("control_type"): + self.job_ttl = self.job_ttl * 2 + # Flux is way slower than Stable Diffusion + if any(model_reference.get_model_baseline(mn) in ["flux_1"] for mn in self.wp.get_model_names()): + self.job_ttl = self.job_ttl * 3 + if self.worker.extra_slow_worker is True: + self.job_ttl = self.job_ttl * 3 + db.session.commit() diff --git a/horde/classes/stable/waiting_prompt.py b/horde/classes/stable/waiting_prompt.py index 0fb6e801..5c29771e 100644 --- a/horde/classes/stable/waiting_prompt.py +++ b/horde/classes/stable/waiting_prompt.py @@ -125,7 +125,6 @@ def extract_params(self): self.trusted_workers = True self.shared = False self.prepare_job_payload(self.params) - self.set_job_ttl() # Commit will happen in prepare_job_payload() @logger.catch(reraise=True) @@ -441,32 +440,6 @@ def get_accurate_steps(self): steps *= 2 return steps - def set_job_ttl(self): - # default is 2 minutes. Then we scale up based on resolution. - # This will be more accurate with a newer formula - self.job_ttl = 120 - if self.width * self.height > 2048 * 2048: - self.job_ttl = 800 - elif self.width * self.height > 1024 * 1024: - self.job_ttl = 400 - elif self.width * self.height > 728 * 728: - self.job_ttl = 260 - elif self.width * self.height >= 512 * 512: - self.job_ttl = 150 - # When too many steps are involved, we increase the expiry time - if self.get_accurate_steps() >= 200: - self.job_ttl = self.job_ttl * 3 - elif self.get_accurate_steps() >= 100: - self.job_ttl = self.job_ttl * 2 - # CN is 3 times slower - if self.gen_payload.get("control_type"): - self.job_ttl = self.job_ttl * 3 - if "SDXL_beta::stability.ai#6901" in self.get_model_names(): - logger.debug(self.get_model_names()) - self.job_ttl = 300 - # logger.info([weights_count,self.job_ttl]) - db.session.commit() - def log_faulted_prompt(self): source_processing = "txt2img" if self.source_image: diff --git a/horde/classes/stable/worker.py b/horde/classes/stable/worker.py index 182a5d38..22468869 100644 --- a/horde/classes/stable/worker.py +++ b/horde/classes/stable/worker.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later + from horde import exceptions as e from horde.bridge_reference import ( check_bridge_capability, @@ -23,12 +24,13 @@ class ImageWorker(Worker): } # TODO: Switch to max_power max_pixels = db.Column(db.BigInteger, default=512 * 512, nullable=False) - allow_img2img = db.Column(db.Boolean, default=True, nullable=False) - allow_painting = db.Column(db.Boolean, default=True, nullable=False) - allow_post_processing = db.Column(db.Boolean, default=True, nullable=False) - allow_controlnet = db.Column(db.Boolean, default=False, nullable=False) - allow_sdxl_controlnet = db.Column(db.Boolean, default=False, nullable=False) - allow_lora = db.Column(db.Boolean, default=False, nullable=False) + allow_img2img = db.Column(db.Boolean, default=True, nullable=False, index=True) + allow_painting = db.Column(db.Boolean, default=True, nullable=False, index=True) + allow_post_processing = db.Column(db.Boolean, default=True, nullable=False, index=True) + allow_controlnet = db.Column(db.Boolean, default=False, nullable=False, index=True) + allow_sdxl_controlnet = db.Column(db.Boolean, default=False, nullable=False, index=True) + allow_lora = db.Column(db.Boolean, default=False, nullable=False, index=True) + limit_max_steps = db.Column(db.Boolean, default=False, nullable=False, index=True) wtype = "image" def check_in(self, max_pixels, **kwargs): @@ -43,6 +45,7 @@ def check_in(self, max_pixels, **kwargs): self.allow_controlnet = kwargs.get("allow_controlnet", False) self.allow_sdxl_controlnet = kwargs.get("allow_sdxl_controlnet", False) self.allow_lora = kwargs.get("allow_lora", False) + self.limit_max_steps = kwargs.get("limit_max_steps", False) if len(self.get_model_names()) == 0: self.set_models(["stable_diffusion"]) paused_string = "" @@ -138,6 +141,11 @@ def can_generate(self, waiting_prompt): and not check_bridge_capability("stable_cascade_2pass", self.bridge_agent) ): return [False, "bridge_version"] + if "flux_1" in model_reference.get_all_model_baselines(self.get_model_names()) and not check_bridge_capability( + "flux", + self.bridge_agent, + ): + return [False, "bridge_version"] if waiting_prompt.params.get("clip_skip", 1) > 1 and not check_bridge_capability( "clip_skip", self.bridge_agent, @@ -150,6 +158,17 @@ def can_generate(self, waiting_prompt): return [False, "bridge_version"] if not waiting_prompt.safe_ip and not self.allow_unsafe_ipaddr: return [False, "unsafe_ip"] + if self.limit_max_steps: + for mn in waiting_prompt.get_model_names(): + avg_steps = ( + int( + model_reference.get_model_requirements(mn).get("min_steps", 20) + + model_reference.get_model_requirements(mn).get("max_steps", 40), + ) + / 2 + ) + if waiting_prompt.get_accurate_steps() > avg_steps: + return [False, "step_count"] # We do not give untrusted workers anon or VPN generations, to avoid anything slipping by and spooking them. # logger.warning(datetime.utcnow()) if not self.user.trusted: # FIXME #noqa SIM102 diff --git a/horde/consts.py b/horde/consts.py index c20879d4..703e34bc 100644 --- a/horde/consts.py +++ b/horde/consts.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later -HORDE_VERSION = "4.42.0 " +HORDE_VERSION = "4.43.0 " WHITELISTED_SERVICE_IPS = { "212.227.227.178", # Turing Bot diff --git a/horde/database/functions.py b/horde/database/functions.py index affc1e50..041ff949 100644 --- a/horde/database/functions.py +++ b/horde/database/functions.py @@ -840,6 +840,13 @@ def get_sorted_wp_filtered_to_worker(worker, models_list=None, blacklist=None, p worker.speed >= 500000, # 0.5 MPS/s ImageWaitingPrompt.slow_workers == True, # noqa E712 ), + or_( + worker.extra_slow_worker is False, + and_( + worker.extra_slow_worker is True, + ImageWaitingPrompt.extra_slow_workers.is_(True), + ), + ), or_( not_(ImageWaitingPrompt.params.has_key("transparent")), ImageWaitingPrompt.params["transparent"].astext.cast(Boolean).is_(False), @@ -1047,6 +1054,12 @@ def count_skipped_image_wp(worker, models_list=None, blacklist=None, priority_us ).count() if skipped_wps > 0: ret_dict["performance"] = skipped_wps + if worker.extra_slow_worker is True: + skipped_wps = open_wp_list.filter( + ImageWaitingPrompt.extra_slow_workers == False, # noqa E712 + ).count() + if skipped_wps > 0: + ret_dict["performance"] = ret_dict.get("performance", 0) + skipped_wps # Count skipped WPs requiring trusted workers if worker.user.trusted is False: skipped_wps = open_wp_list.filter( diff --git a/horde/database/threads.py b/horde/database/threads.py index 5c78c1b9..38847c13 100644 --- a/horde/database/threads.py +++ b/horde/database/threads.py @@ -204,17 +204,18 @@ def check_waiting_prompts(): .filter( procgen_class.generation == None, # noqa E712 procgen_class.faulted == False, # noqa E712 - # cutoff_time - procgen_class.start_time > wp_class.job_ttl, - # How do we calculate this in the query? Maybe I need to - # set an expiry time iun procgen as well better? + # TODO: How do we calculate this in the query? + # cutoff_time - procgen_class.start_time > procgen_class.job_ttl, ) .all() ) + modifed_procgens = 0 for proc_gen in all_proc_gen: - if proc_gen.is_stale(proc_gen.wp.job_ttl): + if proc_gen.is_stale(): proc_gen.abort() proc_gen.wp.n += 1 - if len(all_proc_gen) >= 1: + modifed_procgens += 1 + if modifed_procgens >= 1: db.session.commit() # Faults WP with 3 or more faulted Procgens wp_ids = ( diff --git a/horde/model_reference.py b/horde/model_reference.py index 9b53b20e..1f0a4279 100644 --- a/horde/model_reference.py +++ b/horde/model_reference.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import os +from datetime import datetime import requests @@ -24,15 +25,21 @@ class ModelReference(PrimaryTimedFunction): testing_models = {} def call_function(self): - """Retrieves to nataili and text model reference and stores in it a var""" + """Retrieves to image and text model reference and stores in it a var""" # If it's running in SQLITE_MODE, it means it's a test and we never want to grab the quorum # We don't want to report on any random model name a client might request for _riter in range(10): try: + ref_json = "https://raw.githubusercontent.com/Haidra-Org/AI-Horde-image-model-reference/main/stable_diffusion.json" + if datetime.utcnow() <= datetime(2024, 9, 30): # Flux Beta + ref_json = ( + "https://raw.githubusercontent.com/Haidra-Org/AI-Horde-image-model-reference/refs/heads/flux/stable_diffusion.json" + ) + logger.debug("Using flux beta model reference...") self.reference = requests.get( os.getenv( "HORDE_IMAGE_COMPVIS_REFERENCE", - "https://raw.githubusercontent.com/Haidra-Org/AI-Horde-image-model-reference/main/stable_diffusion.json", + ref_json, ), timeout=2, ).json() diff --git a/horde/utils.py b/horde/utils.py index e4389b75..ea20cd2a 100644 --- a/horde/utils.py +++ b/horde/utils.py @@ -101,6 +101,10 @@ def get_expiry_date(): return datetime.utcnow() + dateutil.relativedelta.relativedelta(minutes=+20) +def get_extra_slow_expiry_date(): + return datetime.utcnow() + dateutil.relativedelta.relativedelta(minutes=+60) + + def get_interrogation_form_expiry_date(): return datetime.utcnow() + dateutil.relativedelta.relativedelta(minutes=+3) diff --git a/sql_statements/4.43.0.txt b/sql_statements/4.43.0.txt new file mode 100644 index 00000000..967b9ebe --- /dev/null +++ b/sql_statements/4.43.0.txt @@ -0,0 +1,13 @@ +ALTER TABLE waiting_prompts ADD COLUMN extra_slow_workers BOOLEAN NOT NULL default false; +ALTER TABLE workers ADD COLUMN extra_slow_worker BOOLEAN NOT NULL default false; +ALTER TABLE workers ADD COLUMN limit_max_steps BOOLEAN NOT NULL default false; +ALTER TABLE processing_gens ADD COLUMN job_ttl INTEGER NOT NULL DEFAULT 150; +CREATE INDEX idx_processing_gens_job_ttl ON public.processing_gens USING btree(job_ttl); +CREATE INDEX idx_waiting_prompts_extra_slow_workers ON public.waiting_prompts USING btree(extra_slow_workers); +CREATE INDEX idx_workers_extra_slow_worker ON public.workers USING btree(extra_slow_worker); +CREATE INDEX idx_workers_allow_img2img ON public.workers USING btree(allow_img2img); +CREATE INDEX idx_workers_allow_painting ON public.workers USING btree(allow_painting); +CREATE INDEX idx_workers_allow_post_processing ON public.workers USING btree(allow_post_processing); +CREATE INDEX idx_workers_allow_controlnet ON public.workers USING btree(allow_controlnet); +CREATE INDEX idx_workers_allow_sdxl_controlnet ON public.workers USING btree(allow_sdxl_controlnet); +CREATE INDEX idx_workers_allow_lora ON public.workers USING btree(allow_lora); \ No newline at end of file diff --git a/sql_statements/4.43.0.txt.license b/sql_statements/4.43.0.txt.license new file mode 100644 index 00000000..8140c6e2 --- /dev/null +++ b/sql_statements/4.43.0.txt.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: Konstantinos Thoukydidis + +SPDX-License-Identifier: AGPL-3.0-or-later diff --git a/tests/test_image.py b/tests/test_image.py index 109d0ca7..48f35167 100644 --- a/tests/test_image.py +++ b/tests/test_image.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later -import json + +import time import requests @@ -10,6 +11,7 @@ def test_simple_image_gen(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: + print("test_simple_image_gen") headers = {"apikey": api_key, "Client-Agent": f"aihorde_ci_client:{CIVERSION}:(discord)db0#1625"} # ci/cd user async_dict = { "prompt": "a horde of cute stable robots in a sprawling server room repairing a massive mainframe", @@ -37,11 +39,10 @@ def test_simple_image_gen(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: async_results = async_req.json() req_id = async_results["id"] # print(async_results) - print(async_results) pop_dict = { "name": "CICD Fake Dreamer", "models": TEST_MODELS, - "bridge_agent": "AI Horde Worker reGen:8.0.1-citests:https://github.com/Haidra-Org/horde-worker-reGen", + "bridge_agent": "AI Horde Worker reGen:9.0.1-citests:https://github.com/Haidra-Org/horde-worker-reGen", "nsfw": True, "amount": 10, "max_pixels": 4194304, @@ -55,14 +56,14 @@ def test_simple_image_gen(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: } pop_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/pop", json=pop_dict, headers=headers) try: - print(pop_req.text) + # print(pop_req.text) assert pop_req.ok, pop_req.text except AssertionError as err: requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) print("Request cancelled") raise err pop_results = pop_req.json() - print(json.dumps(pop_results, indent=4)) + # print(json.dumps(pop_results, indent=4)) job_id = pop_results["id"] try: @@ -84,7 +85,7 @@ def test_simple_image_gen(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: retrieve_req = requests.get(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) assert retrieve_req.ok, retrieve_req.text retrieve_results = retrieve_req.json() - print(json.dumps(retrieve_results, indent=4)) + # print(json.dumps(retrieve_results, indent=4)) assert len(retrieve_results["generations"]) == 1 gen = retrieve_results["generations"][0] assert len(gen["gen_metadata"]) == 0 @@ -94,8 +95,182 @@ def test_simple_image_gen(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: assert gen["state"] == "ok" assert retrieve_results["kudos"] > 1 assert retrieve_results["done"] is True + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + + +TEST_MODELS_FLUX = ["Flux.1-Schnell fp8 (Compact)"] + + +def test_flux_image_gen(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: + print("test_flux_image_gen") + headers = {"apikey": api_key, "Client-Agent": f"aihorde_ci_client:{CIVERSION}:(discord)db0#1625"} # ci/cd user + async_dict = { + "prompt": "a horde of cute flux robots in a sprawling server room repairing a massive mainframe", + "nsfw": True, + "censor_nsfw": False, + "r2": True, + "shared": True, + "trusted_workers": True, + "params": { + "width": 1024, + "height": 1024, + "steps": 8, + "cfg_scale": 1, + "sampler_name": "k_euler", + }, + "models": TEST_MODELS_FLUX, + # "extra_slow_workers": True, + } + protocol = "http" + if HORDE_URL in ["dev.stablehorde.net", "stablehorde.net"]: + protocol = "https" + time.sleep(1) + async_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/async", json=async_dict, headers=headers) + assert async_req.ok, async_req.text + async_results = async_req.json() + req_id = async_results["id"] + # print(async_results) + pop_dict = { + "name": "CICD Fake Dreamer", + "models": TEST_MODELS_FLUX, + "bridge_agent": "AI Horde Worker reGen:9.0.1-citests:https://github.com/Haidra-Org/horde-worker-reGen", + "nsfw": True, + "amount": 10, + "max_pixels": 4194304, + "allow_img2img": True, + "allow_painting": True, + "allow_unsafe_ipaddr": True, + "allow_post_processing": True, + "allow_controlnet": True, + "allow_sdxl_controlnet": True, + "allow_lora": True, + "extra_slow_worker": False, + "limit_max_steps": True, + } + + # Test limit_max_steps + pop_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/pop", json=pop_dict, headers=headers) + try: + # print(pop_req.text) + assert pop_req.ok, pop_req.text + except AssertionError as err: + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + print("Request cancelled") + raise err + pop_results = pop_req.json() + # print(json.dumps(pop_results, indent=4)) + try: + assert pop_results["id"] is None, pop_results + assert pop_results["skipped"].get("step_count") == 1, pop_results + except AssertionError as err: + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + print("Request cancelled") + raise err + + # Test extra_slow_worker + async_dict["params"]["steps"] = 5 + pop_dict["extra_slow_worker"] = True + time.sleep(0.5) + pop_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/pop", json=pop_dict, headers=headers) + try: + # print(pop_req.text) + assert pop_req.ok, pop_req.text + except AssertionError as err: + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + print("Request cancelled") + raise err + pop_results = pop_req.json() + # print(json.dumps(pop_results, indent=4)) + try: + assert pop_results["id"] is None, pop_results + assert pop_results["skipped"]["performance"] == 1, pop_results + except AssertionError as err: + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + print("Request cancelled") + raise err + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + + # Try popping as an extra slow worker + async_dict["extra_slow_workers"] = True + time.sleep(0.5) + async_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/async", json=async_dict, headers=headers) + assert async_req.ok, async_req.text + async_results = async_req.json() + req_id = async_results["id"] + time.sleep(0.5) + pop_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/pop", json=pop_dict, headers=headers) + try: + assert pop_req.ok, pop_req.text + except AssertionError as err: + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + print("Request cancelled") + raise err + pop_results = pop_req.json() + job_id = pop_results["id"] + try: + assert job_id is not None, pop_results + except AssertionError as err: + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + print("Request cancelled") + raise err + submit_dict = { + "id": job_id, + "generation": "R2", + "state": "ok", + "seed": 0, + } + submit_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/submit", json=submit_dict, headers=headers) + assert submit_req.ok, submit_req.text + submit_results = submit_req.json() + assert submit_results["reward"] > 0 + retrieve_req = requests.get(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + assert retrieve_req.ok, retrieve_req.text + retrieve_results = retrieve_req.json() + # print(json.dumps(retrieve_results, indent=4)) + assert len(retrieve_results["generations"]) == 1 + gen = retrieve_results["generations"][0] + assert len(gen["gen_metadata"]) == 0 + assert gen["seed"] == "0" + assert gen["worker_name"] == "CICD Fake Dreamer" + assert gen["model"] in TEST_MODELS_FLUX + assert gen["state"] == "ok" + assert retrieve_results["kudos"] > 1 + assert retrieve_results["done"] is True + requests.delete(f"{protocol}://{HORDE_URL}/api/v2/generate/status/{req_id}", headers=headers) + + +def quick_pop(api_key: str, HORDE_URL: str, CIVERSION: str) -> None: + print("quick_pop") + headers = {"apikey": api_key, "Client-Agent": f"aihorde_ci_client:{CIVERSION}:(discord)db0#1625"} # ci/cd user + protocol = "http" + if HORDE_URL in ["dev.stablehorde.net", "stablehorde.net"]: + protocol = "https" + # print(async_results) + pop_dict = { + "name": "CICD Fake Dreamer", + "models": TEST_MODELS_FLUX, + "bridge_agent": "AI Horde Worker reGen:9.1.0-citests:https://github.com/Haidra-Org/horde-worker-reGen", + "nsfw": True, + "amount": 10, + "max_pixels": 4194304, + "allow_img2img": True, + "allow_painting": True, + "allow_unsafe_ipaddr": True, + "allow_post_processing": True, + "allow_controlnet": True, + "allow_sdxl_controlnet": True, + "allow_lora": True, + "extra_slow_worker": False, + "limit_max_steps": True, + } + + # Test limit_max_steps + pop_req = requests.post(f"{protocol}://{HORDE_URL}/api/v2/generate/pop", json=pop_dict, headers=headers) + print(pop_req.text) if __name__ == "__main__": # "ci/cd#12285" - test_simple_image_gen("2bc5XkMeLAWiN9O5s7bhfg", "dev.stablehorde.net", "0.1.1") + test_simple_image_gen("2bc5XkMeLAWiN9O5s7bhfg", "dev.stablehorde.net", "0.2.0") + test_flux_image_gen("2bc5XkMeLAWiN9O5s7bhfg", "dev.stablehorde.net", "0.2.0") + # quick_pop("2bc5XkMeLAWiN9O5s7bhfg", "dev.stablehorde.net", "0.2.0")