From 180e74ddb344922806645a8ed0fecb45204aa7e3 Mon Sep 17 00:00:00 2001 From: Carlos Quintana Date: Mon, 14 Oct 2024 14:58:01 +0200 Subject: [PATCH] chore: unify change alias mailboxes logic --- app/alias_mailbox_utils.py | 62 +++++++++++++++++++++++++++ app/api/views/alias.py | 32 +++----------- app/api/views/custom_domain.py | 27 +++++------- tests/test_alias_mailbox_utils.py | 71 +++++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+), 42 deletions(-) create mode 100644 app/alias_mailbox_utils.py create mode 100644 tests/test_alias_mailbox_utils.py diff --git a/app/alias_mailbox_utils.py b/app/alias_mailbox_utils.py new file mode 100644 index 000000000..d32e0f2e3 --- /dev/null +++ b/app/alias_mailbox_utils.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional + +from app.db import Session +from app.models import Alias, AliasMailbox, Mailbox + +_MAX_MAILBOXES_PER_ALIAS = 20 + + +class CannotSetMailboxesForAliasCause(Enum): + Forbidden = "Forbidden" + EmptyMailboxes = "Must choose at least one mailbox" + TooManyMailboxes = "Too many mailboxes" + + +@dataclass +class SetMailboxesForAliasResult: + performed_change: bool + reason: Optional[CannotSetMailboxesForAliasCause] + + +def set_mailboxes_for_alias( + user_id: int, alias: Alias, mailbox_ids: List[int] +) -> SetMailboxesForAliasResult: + if len(mailbox_ids) == 0: + return SetMailboxesForAliasResult( + performed_change=False, + reason=CannotSetMailboxesForAliasCause.EmptyMailboxes, + ) + if len(mailbox_ids) > _MAX_MAILBOXES_PER_ALIAS: + return SetMailboxesForAliasResult( + performed_change=False, + reason=CannotSetMailboxesForAliasCause.TooManyMailboxes, + ) + + mailboxes = ( + Session.query(Mailbox) + .filter( + Mailbox.id.in_(mailbox_ids), + Mailbox.user_id == user_id, + Mailbox.verified == True, # noqa: E712 + ) + .all() + ) + if len(mailboxes) != len(mailbox_ids): + return SetMailboxesForAliasResult( + performed_change=False, reason=CannotSetMailboxesForAliasCause.Forbidden + ) + + # first remove all existing alias-mailboxes links + AliasMailbox.filter_by(alias_id=alias.id).delete() + Session.flush() + + # then add all new mailboxes, being the first the one associated with the alias + for i, mailbox in enumerate(mailboxes): + if i == 0: + alias.mailbox_id = mailboxes[0].id + else: + AliasMailbox.create(alias_id=alias.id, mailbox_id=mailbox.id) + + return SetMailboxesForAliasResult(performed_change=True, reason=None) diff --git a/app/api/views/alias.py b/app/api/views/alias.py index 572ef21b0..2b8cf00e2 100644 --- a/app/api/views/alias.py +++ b/app/api/views/alias.py @@ -5,6 +5,7 @@ from app import alias_utils from app.alias_audit_log_utils import emit_alias_audit_log, AliasAuditLogAction +from app.alias_mailbox_utils import set_mailboxes_for_alias from app.api.base import api_bp, require_api_auth from app.api.serializer import ( AliasInfo, @@ -27,7 +28,7 @@ ) from app.extensions import limiter from app.log import LOG -from app.models import Alias, Contact, Mailbox, AliasMailbox, AliasDeleteReason +from app.models import Alias, Contact, Mailbox, AliasDeleteReason @deprecated @@ -297,30 +298,11 @@ def update_alias(alias_id): if "mailbox_ids" in data: mailbox_ids = [int(m_id) for m_id in data.get("mailbox_ids")] - mailboxes: [Mailbox] = [] - - # check if all mailboxes belong to user - for mailbox_id in mailbox_ids: - mailbox = Mailbox.get(mailbox_id) - if not mailbox or mailbox.user_id != user.id or not mailbox.verified: - return jsonify(error="Forbidden"), 400 - mailboxes.append(mailbox) - - if not mailboxes: - return jsonify(error="Must choose at least one mailbox"), 400 - - # <<< update alias mailboxes >>> - # first remove all existing alias-mailboxes links - AliasMailbox.filter_by(alias_id=alias.id).delete() - Session.flush() - - # then add all new mailboxes - for i, mailbox in enumerate(mailboxes): - if i == 0: - alias.mailbox_id = mailboxes[0].id - else: - AliasMailbox.create(alias_id=alias.id, mailbox_id=mailbox.id) - # <<< END update alias mailboxes >>> + res = set_mailboxes_for_alias( + user_id=user.id, alias=alias, mailbox_ids=mailbox_ids + ) + if not res.performed_change: + return jsonify(error=res.reason.value), 400 mailbox_ids_string = ",".join(map(str, mailbox_ids)) changed_fields.append(f"mailbox_ids ({mailbox_ids_string})") diff --git a/app/api/views/custom_domain.py b/app/api/views/custom_domain.py index b42c957ea..522377cc6 100644 --- a/app/api/views/custom_domain.py +++ b/app/api/views/custom_domain.py @@ -2,8 +2,10 @@ from flask import jsonify from app.api.base import api_bp, require_api_auth +from app.custom_domain_utils import set_custom_domain_mailboxes from app.db import Session -from app.models import CustomDomain, DomainDeletedAlias, Mailbox, DomainMailbox +from app.log import LOG +from app.models import CustomDomain, DomainDeletedAlias def custom_domain_to_dict(custom_domain: CustomDomain): @@ -100,23 +102,14 @@ def update_custom_domain(custom_domain_id): if "mailbox_ids" in data: mailbox_ids = [int(m_id) for m_id in data.get("mailbox_ids")] - if mailbox_ids: - # check if mailbox is not tempered with - mailboxes = [] - for mailbox_id in mailbox_ids: - mailbox = Mailbox.get(mailbox_id) - if not mailbox or mailbox.user_id != user.id or not mailbox.verified: - return jsonify(error="Forbidden"), 400 - mailboxes.append(mailbox) - - # first remove all existing domain-mailboxes links - DomainMailbox.filter_by(domain_id=custom_domain.id).delete() - Session.flush() - - for mailbox in mailboxes: - DomainMailbox.create(domain_id=custom_domain.id, mailbox_id=mailbox.id) - + result = set_custom_domain_mailboxes(user.id, custom_domain, mailbox_ids) + if result.success: changed = True + else: + LOG.info( + f"Prevented from updating mailboxes [custom_domain_id={custom_domain.id}]: {result.reason.value}" + ) + return jsonify(error="Forbidden"), 400 if changed: Session.commit() diff --git a/tests/test_alias_mailbox_utils.py b/tests/test_alias_mailbox_utils.py new file mode 100644 index 000000000..3a61e1283 --- /dev/null +++ b/tests/test_alias_mailbox_utils.py @@ -0,0 +1,71 @@ +from typing import Tuple + +from app.alias_mailbox_utils import ( + set_mailboxes_for_alias, + CannotSetMailboxesForAliasCause, +) +from app.models import Alias, Mailbox, User, AliasMailbox +from tests.utils import create_new_user, random_email + + +def setup() -> Tuple[User, Alias]: + user = create_new_user() + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + commit=True, + ) + return user, alias + + +def test_set_mailboxes_for_alias_empty_list(): + user, alias = setup() + res = set_mailboxes_for_alias(user.id, alias, []) + + assert res.performed_change is False + assert res.reason is CannotSetMailboxesForAliasCause.EmptyMailboxes + + +def test_set_mailboxes_for_alias_mailbox_for_other_user(): + user, alias = setup() + another_user = create_new_user() + res = set_mailboxes_for_alias(user.id, alias, [another_user.default_mailbox_id]) + + assert res.performed_change is False + assert res.reason is CannotSetMailboxesForAliasCause.Forbidden + + +def test_set_mailboxes_for_alias_mailbox_not_exists(): + user, alias = setup() + res = set_mailboxes_for_alias(user.id, alias, [9999999]) + + assert res.performed_change is False + assert res.reason is CannotSetMailboxesForAliasCause.Forbidden + + +def test_set_mailboxes_for_alias_mailbox_success(): + user, alias = setup() + mb1 = Mailbox.create( + user_id=user.id, + email=random_email(), + verified=True, + ) + mb2 = Mailbox.create( + user_id=user.id, + email=random_email(), + verified=True, + commit=True, + ) + res = set_mailboxes_for_alias(user.id, alias, [mb1.id, mb2.id]) + + assert res.performed_change is True + assert res.reason is None + + db_alias = Alias.get_by(id=alias.id) + assert db_alias is not None + assert db_alias.mailbox_id == mb1.id + + alias_mailboxes = AliasMailbox.filter_by(alias_id=alias.id).all() + assert len(alias_mailboxes) == 1 + assert alias_mailboxes[0].mailbox_id == mb2.id