Skip to content

Commit

Permalink
Introduce a schema for the rundb.
Browse files Browse the repository at this point in the history
Validation is done after the creation of runs and before writing
finished runs to disk. If a finished run fails validation then
this is logged but the run is otherwise treated normally.

No attempt has been made to make the schema as tight as possible.
For example most numbers should be positive but this constraint
is ignored by the schema. The main purpose of the schema is to
document the structure of the rundb. The reason it is also used
for validation in some cases is to ensure that if the
structure of the rundb changes then the schema is updated to
reflect this change.

For a description of the schema format see the README.md file
of https://github.com/vdbergh/vtjson

The validation code is included in this PR but it can be
installed as an independent package via

pip install vtjson

Validating a run with 500 tasks (which is a lot these days)
takes 12 ms on an AWS t2.micro using Python 3.11.
  • Loading branch information
vdbergh authored and ppigazzini committed Dec 3, 2023
1 parent c7fcc11 commit aa4291f
Show file tree
Hide file tree
Showing 10 changed files with 946 additions and 213 deletions.
5 changes: 3 additions & 2 deletions server/fishtest/actiondb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timezone

from fishtest.util import hex_print, union, validate, worker_name
from fishtest.util import hex_print, worker_name
from fishtest.vtjson import _validate, union
from pymongo import DESCENDING

schema = union(
Expand Down Expand Up @@ -280,7 +281,7 @@ def block_worker(self, username=None, worker=None, message=None):
def insert_action(self, **action):
if "run_id" in action:
action["run_id"] = str(action["run_id"])
ret = validate(schema, action, "action", strict=True)
ret = _validate(schema, action, "action")
if ret == "":
action["time"] = datetime.now(timezone.utc).timestamp()
self.actions.insert_one(action)
Expand Down
21 changes: 11 additions & 10 deletions server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from datetime import datetime, timezone

from fishtest.stats.stat_util import SPRT_elo
from fishtest.util import optional_key, union, validate, worker_name
from fishtest.util import worker_name
from fishtest.vtjson import _validate, lax, union
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPFound,
Expand Down Expand Up @@ -33,10 +34,10 @@
def validate_request(request):
schema = {
"password": str,
optional_key("run_id"): str,
optional_key("task_id"): int,
optional_key("pgn"): str,
optional_key("message"): str,
"run_id?": str,
"task_id?": int,
"pgn?": str,
"message?": str,
"worker_info": {
"uname": str,
"architecture": [str, str],
Expand All @@ -54,13 +55,13 @@ def validate_request(request):
"ARCH": str,
"nps": float,
},
optional_key("spsa"): {
"spsa?": {
"wins": int,
"losses": int,
"draws": int,
"num_games": int,
},
optional_key("stats"): {
"stats?": {
"wins": int,
"losses": int,
"draws": int,
Expand All @@ -69,7 +70,7 @@ def validate_request(request):
"pentanomial": [int, int, int, int, int],
},
}
return validate(schema, request, "request", strict=True)
return _validate(schema, request, "request")


# Avoids exposing sensitive data about the workers to the client and skips some heavy data.
Expand Down Expand Up @@ -137,8 +138,8 @@ def validate_username_password(self, api):
self.handle_error("request is not json encoded")

# Is the request syntactically correct?
schema = {"password": str, "worker_info": {"username": str}}
self.handle_error(validate(schema, self.request_body, "request"))
schema = lax({"password": str, "worker_info": {"username": str}})
self.handle_error(_validate(schema, self.request_body, "request"))

# is the supplied password correct?
token = self.request.userdb.authenticate(
Expand Down
204 changes: 204 additions & 0 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
update_residuals,
worker_name,
)
from fishtest.vtjson import _validate, ip_address, number, regex, union, url
from fishtest.workerdb import WorkerDb
from pymongo import DESCENDING, MongoClient

Expand All @@ -42,6 +43,196 @@

last_rundb = None

# This schema only matches new runs. The old runs are not
# compatible with it. For documentation purposes it would
# also be useful to have a "universal schema" that matches
# all the runs in the db.
# To make this practical we will eventually put all schemas
# in a separate module "schemas.py".

net_name = regex("nn-[a-z0-9]{12}.nnue", name="net_name")
tc = regex(r"([1-9]\d*/)?\d+(\.\d+)?(\+\d+(\.\d+)?)?", name="tc")
str_int = regex(r"[1-9]\d*", name="str_int")
sha = regex(r"[a-f0-9]{40}", name="sha")
country_code = regex(r"[A-Z][A-Z]", name="country_code")
run_id = regex(r"[a-f0-9]{24}", name="run_id")

worker_info_schema = {
"uname": str,
"architecture": [str, str],
"concurrency": int,
"max_memory": int,
"min_threads": int,
"username": str,
"version": int,
"python_version": [int, int, int],
"gcc_version": [int, int, int],
"compiler": union("clang++", "g++"),
"unique_key": str,
"modified": bool,
"ARCH": str,
"nps": number,
"near_github_api_limit": bool,
"remote_addr": ip_address,
"country_code": union(country_code, "?"),
}

results_schema = {
"wins": int,
"losses": int,
"draws": int,
"crashes": int,
"time_losses": int,
"pentanomial": [int, int, int, int, int],
}

schema = {
"_id?": ObjectId,
"start_time": datetime,
"last_updated": datetime,
"tc_base": number,
"base_same_as_master": bool,
"rescheduled_from?": run_id,
"approved": bool,
"approver": str,
"finished": bool,
"deleted": bool,
"failed": bool,
"is_green": bool,
"is_yellow": bool,
"workers": int,
"cores": int,
"results": results_schema,
"results_info?": {
"style": str,
"info": [str, ...],
},
"args": {
"base_tag": str,
"new_tag": str,
"base_net": net_name,
"new_net": net_name,
"num_games": int,
"tc": tc,
"new_tc": tc,
"book": str,
"book_depth": str_int,
"threads": int,
"resolved_base": sha,
"resolved_new": sha,
"msg_base": str,
"msg_new": str,
"base_options": str,
"new_options": str,
"info": str,
"base_signature": str_int,
"new_signature": str_int,
"username": str,
"tests_repo": url,
"auto_purge": bool,
"throughput": number,
"itp": number,
"priority": number,
"adjudication": bool,
"sprt?": {
"alpha": 0.05,
"beta": 0.05,
"elo0": number,
"elo1": number,
"elo_model": "normalized",
"state": union("", "accepted", "rejected"),
"llr": number,
"batch_size": int,
"lower_bound": -math.log(19),
"upper_bound": math.log(19),
"lost_samples?": int,
"illegal_update?": int,
"overshoot?": {
"last_update": int,
"skipped_updates": int,
"ref0": number,
"m0": number,
"sq0": number,
"ref1": number,
"m1": number,
"sq1": number,
},
},
"spsa?": {
"A": number,
"alpha": number,
"gamma": number,
"raw_params": str,
"iter": int,
"num_iter": int,
"params": [
{
"name": str,
"start": number,
"min": number,
"max": number,
"c_end": number,
"r_end": number,
"c": number,
"a_end": number,
"a": number,
"theta": number,
},
...,
],
"param_history?": [
[{"theta": number, "R": number, "c": number}, ...],
...,
],
},
},
"tasks": [
{
"num_games": int,
"active": bool,
"last_updated": datetime,
"start": int,
"residual?": number,
"residual_color?": str,
"bad?": True,
"stats": results_schema,
"worker_info": worker_info_schema,
},
...,
],
"bad_tasks?": [
{
"num_games": int,
"active": False,
"last_updated": datetime,
"start": int,
"residual": number,
"residual_color": str,
"bad": True,
"task_id": int,
"stats": results_schema,
"worker_info": worker_info_schema,
},
...,
],
}

# Avoid leaking too many things into the global scope
del (
country_code,
ip_address,
number,
regex,
results_schema,
run_id,
sha,
str_int,
tc,
union,
url,
worker_info_schema,
)


def get_port():
params = {}
Expand Down Expand Up @@ -241,6 +432,12 @@ def new_run(
if rescheduled_from:
new_run["rescheduled_from"] = rescheduled_from

valid = _validate(schema, new_run, "run")
if valid != "":
message = f"The new run object does not _validate: {valid}"
print(message, flush=True)
raise Exception(message)

return self.runs.insert_one(new_run).inserted_id

def upload_pgn(self, run_id, pgn_zip):
Expand Down Expand Up @@ -613,6 +810,7 @@ def compute_results(self, run):
"""
This is used in purge_run and also to verify the incrementally updated results
when a run is finished."""

results = {"wins": 0, "losses": 0, "draws": 0, "crashes": 0, "time_losses": 0}

has_pentanomial = True
Expand Down Expand Up @@ -1366,6 +1564,12 @@ def stop_run(self, run_id):
run["cores"] = 0
run["workers"] = 0
run["finished"] = True
valid = _validate(schema, run, "run")
if valid != "":
print(f"The run object {run_id} does not validate: {valid}", flush=True)
# We are not confident enough to enable this...
# assert False

self.buffer(run, True)
# Publish the results of the run to the Fishcooking forum
post_in_fishcooking_results(run)
Expand Down
23 changes: 15 additions & 8 deletions server/fishtest/userdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
from datetime import datetime, timezone

from bson.objectid import ObjectId
from fishtest.util import optional_key, validate
from fishtest.vtjson import _validate, email, union, url
from pymongo import ASCENDING

schema = {
optional_key("_id"): ObjectId,
"_id?": ObjectId,
"username": str,
"password": str,
"registration_time": datetime,
"blocked": bool,
"email": str,
"groups": list,
"tests_repo": str,
"email": email,
"groups": [str, ...],
"tests_repo": union("", url),
"machine_limit": int,
}

DEFAULT_MACHINE_LIMIT = 16


def validate_user(user):
valid = _validate(schema, user, "user")
if valid != "":
print(valid, flush=True)
assert False


class UserDb:
def __init__(self, db):
self.db = db
Expand Down Expand Up @@ -89,7 +96,7 @@ def get_user_groups(self, username):
def add_user_group(self, username, group):
user = self.find(username)
user["groups"].append(group)
assert validate(schema, user, "user", strict=True) == ""
validate_user(user)
self.users.replace_one({"_id": user["_id"]}, user)
self.clear_cache()

Expand All @@ -108,7 +115,7 @@ def create_user(self, username, password, email):
"tests_repo": "",
"machine_limit": DEFAULT_MACHINE_LIMIT,
}
assert validate(schema, user, "user", strict=True) == ""
validate_user(user)
self.users.insert_one(user)
self.last_pending_time = 0

Expand All @@ -117,7 +124,7 @@ def create_user(self, username, password, email):
return False

def save_user(self, user):
assert validate(schema, user, "user", strict=True) == ""
validate_user(user)
self.users.replace_one({"_id": user["_id"]}, user)
self.last_pending_time = 0
self.clear_cache()
Expand Down
Loading

0 comments on commit aa4291f

Please sign in to comment.