diff --git a/temba/contacts/forms.py b/temba/contacts/forms.py new file mode 100644 index 00000000000..adc6a9e4f2d --- /dev/null +++ b/temba/contacts/forms.py @@ -0,0 +1,177 @@ +from collections import OrderedDict + +from django import forms +from django.db.models.functions import Upper +from django.utils.translation import gettext_lazy as _ + +from temba import mailroom +from temba.utils import languages +from temba.utils.fields import InputWidget, SelectMultipleWidget, SelectWidget, TembaMultipleChoiceField + +from .models import URN, Contact, ContactGroup, ContactURN +from .search import parse_query + + +class UpdateContactForm(forms.ModelForm): + language = forms.ChoiceField(required=False, label=_("Language"), choices=(), widget=SelectWidget()) + groups = TembaMultipleChoiceField( + queryset=ContactGroup.objects.none(), + required=False, + label=_("Groups"), + widget=SelectMultipleWidget(attrs={"placeholder": _("Select groups for this contact"), "searchable": True}), + ) + + def __init__(self, org, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.org = org + + lang_choices = [("", "No Preference")] + + # if they had a language that has since been removed, make sure we show it + if self.instance.language and self.instance.language not in org.flow_languages: + lang_name = languages.get_name(self.instance.language) + lang_choices += [(self.instance.language, _(f"{lang_name} (Missing)"))] + + lang_choices += list(languages.choices(codes=org.flow_languages)) + + self.fields["language"].initial = self.instance.language + self.fields["language"].choices = lang_choices + + self.fields["groups"].initial = self.instance.get_groups(manual_only=True) + self.fields["groups"].queryset = ContactGroup.get_groups(org, manual_only=True).order_by(Upper("name")) + + # add all URN scheme fields if org is not anon + if not self.org.is_anon: + urns = self.instance.get_urns() + if not urns: + urns = [ContactURN(scheme="tel")] + + urn_fields = [] + last_urn = None + + for idx, urn in enumerate(urns): + first_urn = last_urn is None or urn.scheme != last_urn.scheme + + urn_choice = None + for choice in URN.SCHEME_CHOICES: + if choice[0] == urn.scheme: + urn_choice = choice + + scheme = urn.scheme + label = urn.scheme + + if urn_choice: + label = urn_choice[1] + + help_text = _(f"{label} for this contact") + if first_urn: + help_text = _(f"{label} for this contact") + f" (@urns.{scheme})" + + # get all the urns for this scheme + ctrl = forms.CharField( + required=False, label=label, initial=urn.path, help_text=help_text, widget=InputWidget() + ) + urn_fields.append((f"urn__{scheme}__{idx}", ctrl)) + + last_urn = urn + + self.fields.update(OrderedDict(urn_fields)) + + def clean(self): + country = self.org.default_country_code + + def validate_urn(key, scheme, path): + try: + normalized = URN.normalize(URN.from_parts(scheme, path), country) + existing_urn = ContactURN.lookup(self.org, normalized, normalize=False) + + if existing_urn and existing_urn.contact and existing_urn.contact != self.instance: + self._errors[key] = self.error_class([_("Used by another contact")]) + return False + # validate but not with country as users are allowed to enter numbers before adding a channel + elif not URN.validate(normalized): + if scheme == URN.TEL_SCHEME: # pragma: needs cover + self._errors[key] = self.error_class( + [_("Invalid number. Ensure number includes country code, e.g. +1-541-754-3010")] + ) + else: + self._errors[key] = self.error_class([_("Invalid format")]) + return False + return True + except ValueError: + self._errors[key] = self.error_class([_("Invalid input")]) + return False + + # validate URN fields + for field_key, value in self.data.items(): + if field_key.startswith("urn__") and value: + scheme = field_key.split("__")[1] + validate_urn(field_key, scheme, value) + + # validate new URN if provided + if self.data.get("new_path", None): + if validate_urn("new_path", self.data["new_scheme"], self.data["new_path"]): + self.cleaned_data["new_scheme"] = self.data["new_scheme"] + self.cleaned_data["new_path"] = self.data["new_path"] + + return self.cleaned_data + + class Meta: + model = Contact + labels = {"name": _("Name"), "status": _("Status")} + fields = ("name", "status", "language", "groups") + + +class ContactGroupForm(forms.ModelForm): + preselected_contacts = forms.CharField(required=False, widget=forms.HiddenInput) + group_query = forms.CharField(required=False, widget=forms.HiddenInput) + + def __init__(self, org, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.org = org + + def clean_name(self): + name = self.cleaned_data["name"] + + # make sure the name isn't already taken + existing = self.org.groups.filter(is_active=True, name__iexact=name).first() + if existing and self.instance != existing: + raise forms.ValidationError(_("Already used by another group.")) + + count, limit = ContactGroup.get_org_limit_progress(self.org) + if limit is not None and count >= limit: + raise forms.ValidationError( + _( + "This workspace has reached its limit of %(limit)d groups. " + "You must delete existing ones before you can create new ones." + ), + params={"limit": limit}, + ) + + return name + + def clean_query(self): + try: + parsed = parse_query(self.org, self.cleaned_data["query"]) + if not parsed.metadata.allow_as_group: + raise forms.ValidationError(_('You cannot create a smart group based on "id" or "group".')) + + if ( + self.instance + and self.instance.status != ContactGroup.STATUS_READY + and parsed.query != self.instance.query + ): + raise forms.ValidationError(_("You cannot update the query of a group that is evaluating.")) + + return parsed.query + + except mailroom.QueryValidationException as e: + raise forms.ValidationError(str(e)) + + class Meta: + model = ContactGroup + fields = ("name", "query") + labels = {"name": _("Name"), "query": _("Query")} + help_texts = {"query": _("Only contacts matching this query will belong to this group.")} diff --git a/temba/contacts/views.py b/temba/contacts/views.py index 32a3ac60243..d5ec63438ed 100644 --- a/temba/contacts/views.py +++ b/temba/contacts/views.py @@ -44,30 +44,15 @@ OrgPermsMixin, ) from temba.tickets.models import Ticket, Topic -from temba.utils import json, languages, on_transaction_commit +from temba.utils import json, on_transaction_commit from temba.utils.dates import datetime_to_timestamp, timestamp_to_datetime -from temba.utils.fields import ( - CheckboxWidget, - InputWidget, - SelectMultipleWidget, - SelectWidget, - TembaChoiceField, - TembaMultipleChoiceField, -) +from temba.utils.fields import CheckboxWidget, InputWidget, SelectWidget, TembaChoiceField from temba.utils.models import patch_queryset_count from temba.utils.models.es import IDSliceQuerySet from temba.utils.views import BulkActionMixin, ComponentFormMixin, ContentMenuMixin, NonAtomicMixin, SpaMixin -from .models import ( - URN, - Contact, - ContactExport, - ContactField, - ContactGroup, - ContactGroupCount, - ContactImport, - ContactURN, -) +from .forms import ContactGroupForm, UpdateContactForm +from .models import URN, Contact, ContactExport, ContactField, ContactGroup, ContactGroupCount, ContactImport from .search import parse_query, search_contacts from .search.omnibox import omnibox_query, omnibox_results_to_dict @@ -89,60 +74,6 @@ } -class ContactGroupForm(forms.ModelForm): - preselected_contacts = forms.CharField(required=False, widget=forms.HiddenInput) - group_query = forms.CharField(required=False, widget=forms.HiddenInput) - - def __init__(self, org, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.org = org - - def clean_name(self): - name = self.cleaned_data["name"] - - # make sure the name isn't already taken - existing = self.org.groups.filter(is_active=True, name__iexact=name).first() - if existing and self.instance != existing: - raise forms.ValidationError(_("Already used by another group.")) - - count, limit = ContactGroup.get_org_limit_progress(self.org) - if limit is not None and count >= limit: - raise forms.ValidationError( - _( - "This workspace has reached its limit of %(limit)d groups. " - "You must delete existing ones before you can create new ones." - ), - params={"limit": limit}, - ) - - return name - - def clean_query(self): - try: - parsed = parse_query(self.org, self.cleaned_data["query"]) - if not parsed.metadata.allow_as_group: - raise forms.ValidationError(_('You cannot create a smart group based on "id" or "group".')) - - if ( - self.instance - and self.instance.status != ContactGroup.STATUS_READY - and parsed.query != self.instance.query - ): - raise forms.ValidationError(_("You cannot update the query of a group that is evaluating.")) - - return parsed.query - - except mailroom.QueryValidationException as e: - raise forms.ValidationError(str(e)) - - class Meta: - model = ContactGroup - fields = ("name", "query") - labels = {"name": _("Name"), "query": _("Query")} - help_texts = {"query": _("Only contacts matching this query will belong to this group.")} - - class ContactListView(SpaMixin, OrgPermsMixin, BulkActionMixin, SmartListView): """ Base class for contact list views with contact folders and groups listed by the side @@ -258,137 +189,6 @@ def get_context_data(self, **kwargs): return context -class UpdateContactForm(forms.ModelForm): - def __init__(self, org, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.org = org - - # add all URN scheme fields if org is not anon - extra_fields = [] - if not self.org.is_anon: - urns = self.instance.get_urns() - - idx = 0 - - last_urn = None - - if not urns: - urn = ContactURN() - urn.scheme = "tel" - urns = [urn] - - for urn in urns: - first_urn = last_urn is None or urn.scheme != last_urn.scheme - - urn_choice = None - for choice in URN.SCHEME_CHOICES: - if choice[0] == urn.scheme: - urn_choice = choice - - scheme = urn.scheme - label = urn.scheme - - if urn_choice: - label = urn_choice[1] - - help_text = _(f"{label} for this contact") - if first_urn: - help_text = _(f"{label} for this contact") + f" (@urns.{scheme})" - - # get all the urns for this scheme - ctrl = forms.CharField( - required=False, label=label, initial=urn.path, help_text=help_text, widget=InputWidget() - ) - extra_fields.append(("urn__%s__%d" % (scheme, idx), ctrl)) - idx += 1 - - last_urn = urn - - self.fields = OrderedDict(list(self.fields.items()) + extra_fields) - - def clean(self): - country = self.org.default_country_code - - def validate_urn(key, scheme, path): - try: - normalized = URN.normalize(URN.from_parts(scheme, path), country) - existing_urn = ContactURN.lookup(self.org, normalized, normalize=False) - - if existing_urn and existing_urn.contact and existing_urn.contact != self.instance: - self._errors[key] = self.error_class([_("Used by another contact")]) - return False - # validate but not with country as users are allowed to enter numbers before adding a channel - elif not URN.validate(normalized): - if scheme == URN.TEL_SCHEME: # pragma: needs cover - self._errors[key] = self.error_class( - [_("Invalid number. Ensure number includes country code, e.g. +1-541-754-3010")] - ) - else: - self._errors[key] = self.error_class([_("Invalid format")]) - return False - return True - except ValueError: - self._errors[key] = self.error_class([_("Invalid input")]) - return False - - # validate URN fields - for field_key, value in self.data.items(): - if field_key.startswith("urn__") and value: - scheme = field_key.split("__")[1] - validate_urn(field_key, scheme, value) - - # validate new URN if provided - if self.data.get("new_path", None): - if validate_urn("new_path", self.data["new_scheme"], self.data["new_path"]): - self.cleaned_data["new_scheme"] = self.data["new_scheme"] - self.cleaned_data["new_path"] = self.data["new_path"] - - return self.cleaned_data - - class Meta: - model = Contact - fields = ("name",) - widgets = {"name": InputWidget(attrs={"widget_only": False})} - - -class UpdateUpdateContactForm(UpdateContactForm): - groups = TembaMultipleChoiceField( - queryset=ContactGroup.objects.none(), - required=False, - label=_("Groups"), - widget=SelectMultipleWidget(attrs={"placeholder": _("Select groups for this contact"), "searchable": True}), - ) - - def __init__(self, org, *args, **kwargs): - super().__init__(org, *args, **kwargs) - - choices = [("", "No Preference")] - - flow_langs = self.instance.org.flow_languages - - # if they had a preference that has since been removed, make sure we show it - if self.instance.language and self.instance.language not in flow_langs: - lang_name = languages.get_name(self.instance.language) - choices += [(self.instance.language, _(f"{lang_name} (Missing)"))] - - choices += list(languages.choices(codes=flow_langs)) - - self.fields["language"] = forms.ChoiceField( - required=False, label=_("Language"), initial=self.instance.language, choices=choices, widget=SelectWidget() - ) - - self.fields["groups"].initial = self.instance.get_groups(manual_only=True) - self.fields["groups"].queryset = ContactGroup.get_groups(self.org, manual_only=True).order_by(Upper("name")) - - class Meta: - model = Contact - fields = ("name", "status", "language", "groups") - widgets = { - "name": InputWidget(), - } - - class ContactCRUDL(SmartCRUDL): model = Contact actions = ( @@ -922,7 +722,7 @@ def form_valid(self, form): return self.render_modal_response(form) class Update(SpaMixin, ComponentFormMixin, NonAtomicMixin, ModalMixin, OrgObjPermsMixin, SmartUpdateView): - form_class = UpdateUpdateContactForm + form_class = UpdateContactForm success_url = "hide" submit_button_name = _("Save Changes")