Skip to content

Commit

Permalink
[WIP] Broker performance optimisation (#105)
Browse files Browse the repository at this point in the history
* optimise qos on database write

* optimisation for duplicate tasks + debug messages

* improve add dummy requests script

* fix qos rules check

* add performance monitoring

* more performance metrics

* try to use a class attribute for the queue

* always decrease queued requests in qos rules

* introduce time window for get accepted requests

* fix a typeerror with None

* try to fix async internal and db queue

* fix count

* fix count accepted requests

* improve performance log

* fix performance message

* add performance logging

* add config variable for requeueing on killedworker

* add unit tests

* qa
  • Loading branch information
francesconazzaro authored May 13, 2024
1 parent 80e6da4 commit 0af2389
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 112 deletions.
130 changes: 94 additions & 36 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class SystemRequest(BaseModel):
secondary="system_request_qos_rule",
back_populates="system_requests",
uselist=True,
lazy="subquery",
)

@property
Expand Down Expand Up @@ -213,14 +214,29 @@ def get_running_requests(

def get_accepted_requests(
session: sa.orm.Session,
last_created_at: datetime.datetime | None = None,
):
"""Get all accepted requests."""
statement = sa.select(SystemRequest)
if last_created_at:
statement = statement.where(SystemRequest.created_at >= last_created_at)
statement = statement.where(SystemRequest.status == "accepted").order_by(
SystemRequest.created_at
)
return session.scalars(statement).all()


def count_accepted_requests_before(
session: sa.orm.Session,
last_created_at: datetime.datetime,
) -> int:
"""Count running requests for user_uid."""
statement = (
sa.select(SystemRequest)
session.query(SystemRequest)
.where(SystemRequest.status == "accepted")
.order_by(SystemRequest.created_at)
.where(SystemRequest.created_at <= last_created_at)
)
return session.scalars(statement).all()
return statement.count()


def count_finished_requests_per_user_in_session(
Expand Down Expand Up @@ -437,6 +453,12 @@ def get_qos_rule(uid: str, session: sa.orm.Session):
return session.scalars(statement).one()


def get_qos_rules(session: sa.orm.Session):
"""Get all QoS rules."""
statement = sa.select(QoSRule)
return {rule.uid: rule for rule in session.scalars(statement).all()}


def add_qos_rule(
rule,
session: sa.orm.Session,
Expand All @@ -460,56 +482,87 @@ def add_qos_rule(
return qos_rule


def increment_qos_rule_running(rules: list, session: sa.orm.Session):
def increment_qos_rule_running(
rules: list, session: sa.orm.Session, rules_in_db: dict[str, QoSRule] = {}, **kwargs
):
"""Increment the running counter of a QoS rule."""
created_rules: dict = {}
for rule in rules:
try:
qos_rule = get_qos_rule(str(rule.__hash__()), session)
except sqlalchemy.orm.exc.NoResultFound:
qos_rule = add_qos_rule(rule=rule, session=session)
if (rule_uid := str(rule.__hash__())) in rules_in_db:
qos_rule = rules_in_db[rule_uid]
else:
try:
qos_rule = get_qos_rule(rule_uid, session)
except sqlalchemy.orm.exc.NoResultFound:
qos_rule = add_qos_rule(rule=rule, session=session)
created_rules[qos_rule.uid] = qos_rule
qos_rule.running += 1
session.commit()
return created_rules


def decrement_qos_rule_running(rules: list, session: sa.orm.Session):
def decrement_qos_rule_running(
rules: list, session: sa.orm.Session, rules_in_db: dict[str, QoSRule] = {}, **kwargs
):
"""Increment the running counter of a QoS rule."""
for rule in rules:
try:
qos_rule = get_qos_rule(str(rule.__hash__()), session)
qos_rule.running = max(0, qos_rule.running - 1)
except sqlalchemy.orm.exc.NoResultFound:
# this happend when a request is finished after a broker restart.
# the rule is not in the database anymore because it has been reset.
continue
session.commit()
if (rule_uid := str(rule.__hash__())) in rules_in_db:
qos_rule = rules_in_db[rule_uid]
else:
try:
qos_rule = get_qos_rule(rule_uid, session)
except sqlalchemy.orm.exc.NoResultFound:
# this happend when a request is finished after a broker restart.
# the rule is not in the database anymore because it has been reset.
continue
qos_rule.running = max(0, qos_rule.running - 1)


def delete_request_qos_status(request_uid: str, rules: list, session: sa.orm.Session):
def delete_request_qos_status(
request_uid: str,
rules: list,
session: sa.orm.Session,
rules_in_db: dict[str, QoSRule] = {},
**kwargs,
):
"""Delete all QoS rules from a request."""
created_rules: dict = {}
request = get_request(request_uid, session)
for rule in rules:
try:
qos_rule = get_qos_rule(str(rule.__hash__()), session)
except sqlalchemy.orm.exc.NoResultFound:
qos_rule = add_qos_rule(rule=rule, session=session)
if (rule_uid := str(rule.__hash__())) in rules_in_db:
qos_rule = rules_in_db[rule_uid]
else:
try:
qos_rule = get_qos_rule(rule_uid, session)
except sqlalchemy.orm.exc.NoResultFound:
qos_rule = add_qos_rule(rule=rule, session=session)
created_rules[qos_rule.uid] = qos_rule
if qos_rule in request.qos_rules:
request.qos_rules.remove(qos_rule)
qos_rule.queued = max(0, qos_rule.queued - 1)
qos_rule.queued = max(0, qos_rule.queued - 1)
qos_rule.running += 1
session.commit()
return created_rules


def add_request_qos_status(request_uid: str, rules: list, session: sa.orm.Session):
request = get_request(request_uid, session)
def add_request_qos_status(
request: SystemRequest,
rules: list,
session: sa.orm.Session,
rules_in_db: dict[str, QoSRule] = {},
**kwargs,
):
created_rules: dict = {}
if request is None:
return {}
for rule in rules:
try:
qos_rule = get_qos_rule(str(rule.__hash__()), session)
except sqlalchemy.orm.exc.NoResultFound:
if (rule_uid := str(rule.__hash__())) in rules_in_db:
qos_rule = rules_in_db[rule_uid]
else:
qos_rule = add_qos_rule(rule=rule, session=session)
created_rules[qos_rule.uid] = qos_rule
if qos_rule not in request.qos_rules:
qos_rule.queued += 1
request.qos_rules.append(qos_rule)
session.commit()
return created_rules


def get_qos_status_from_request(
Expand Down Expand Up @@ -665,6 +718,15 @@ def add_event(
session.commit()


def dictify_request(request: SystemRequest) -> dict[str, Any]:
ret_value = {
column.key: getattr(request, column.key)
for column in sa.inspect(request).mapper.column_attrs
}
ret_value["qos_rules"] = [rule.uid for rule in request.qos_rules]
return ret_value


def create_request(
session: sa.orm.Session,
user_uid: str,
Expand Down Expand Up @@ -709,11 +771,7 @@ def create_request(
session.add(request)
session.commit()
logger.info("accepted job", **logger_kwargs(request=request))
ret_value = {
column.key: getattr(request, column.key)
for column in sa.inspect(request).mapper.column_attrs
}
return ret_value
return dictify_request(request)


def get_request(
Expand Down
Loading

0 comments on commit 0af2389

Please sign in to comment.