Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create management command to identify and fix surveys migrated from v1 #1634

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions iogt_content_migration/management/commands/fix_surveys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import json

from django.core.management.base import BaseCommand
from questionnaires.models import Survey, SurveyFormField
from wagtail.core.models import PageRevision


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"--fix",
action="store_true",
help="Fix problems identified by the report",
)

def handle(self, *args, **options):
if problems_report := report():
for entry in problems_report:
self.stdout.write(str(entry))
else:
self.stdout.write("No problems found")

if options.get("fix"):
self.stdout.write("Fix application started")
fix(problems_report)
self.stdout.write("Fix application completed")


class ReportEntry:
def __init__(self, survey):
self.survey = survey
self.problems = report_on_revision(survey)

@property
def id(self):
return self.survey.id

@property
def has_problems(self):
return len(self.problems) > 0

def __str__(self):
status = "live " if self.survey.live else "draft"
return f'{status}, {self.survey.id}, "{self.survey.title}", {self.problems}'


class SurveyRevision:
def __init__(self, revision):
self.revision = revision
self.content = json.loads(revision.content_json)

@property
def pk(self):
return self.content.get("pk")

@property
def fields(self):
return [
SurveyRevisionField(field)
for field in self.content.get("survey_form_fields", [])
]


class SurveyRevisionField:
def __init__(self, field):
self._raw = field

@property
def pk(self):
return self._raw.get("pk")

@property
def label(self):
return self._raw.get("label")


def report():
return [
entry
for survey in Survey.objects.all()
if (entry := ReportEntry(survey)).has_problems
]


def report_on_revision(survey):
try:
return identify_problems(survey, get_latest_revision(survey))
except PageRevision.DoesNotExist:
return {}


def get_latest_revision(page):
return SurveyRevision(PageRevision.objects.filter(page=page).latest("created_at"))


def identify_problems(survey, revision):
return {
problem
for p in [
field_ids_mismatch,
id_mismatch,
labels_mismatch,
no_questions,
no_revision_questions,
]
if (problem := p(survey, revision))
}


def no_questions(survey, revision):
return "no_qs" if len(survey.get_form_fields()) < 1 else None


def no_revision_questions(survey, revision):
return "no_rev_qs" if len(revision.fields) < 1 else None


def id_mismatch(survey, revision):
return "id" if revision.pk != survey.id else None


def field_ids_mismatch(survey, revision):
survey_field_ids = {field.id for field in survey.get_form_fields()}
revision_field_ids = {field.pk for field in revision.fields if field.pk}

return "field_ids" if survey_field_ids != revision_field_ids else None


def labels_mismatch(survey, revision):
survey_field_labels = {field.label for field in survey.get_form_fields()}
revision_field_labels = {field.label for field in revision.fields if field.label}

return "labels" if survey_field_labels != revision_field_labels else None


def fix(problems_report):
for entry in problems_report:
if {"id", "field_ids"}.issubset(entry.problems):
print(f"Revision update required, survey={entry.survey}")
entry.survey.save_revision(log_action=True)
elif "no_qs" in entry.problems:
print(
f"Restore fields from previous revision required, survey={entry.survey}"
)
for field in find_first_restorable_revision(entry.survey).fields:
create_field(entry.survey, field._raw).save()
latest_revision = entry.survey.save_revision(log_action=True)
latest_revision.publish()
else:
print(f"No action taken, survey={entry.survey}")


def find_first_restorable_revision(page):
return next(
sr
for revision in PageRevision.objects.filter(page=page).order_by("-created_at")
if is_restorable_v1(((sr := SurveyRevision(revision))), page)
)


def is_restorable_v1(revision, page):
"""Identifies a v1 PageRevision that can be used to restore a v2 Survey.
PageRevisions from v1 reference primary keys that do not match the v2 database
because revisions were copied verbatim from v1. It is possible, though unlikely
that the primary keys might be the same across v1 and v2. The alternative would be
to read the surveys directly from the v1 database. This method was chosen for the
sake of convenience.
"""
return revision.pk != page.id and len(revision.fields) > 0


def create_field(survey, data):
return SurveyFormField(
admin_label=data.get("admin_label"),
choices="|".join(
choice.strip() for choice in data.get("choices", "").split(",")
),
default_value=data.get("default_value"),
field_type=(
"positivenumber"
if (ftype := data.get("field_type")) == "positive_number"
else ftype
),
help_text=data.get("help_text"),
label=data.get("label"),
page=survey,
page_break=data.get("page_break"),
required=data.get("required"),
skip_logic=[
create_answer_option(item)
for item in json.loads(data.get("skip_logic", "[]"))
],
sort_order=data.get("sort_order"),
)


def create_answer_option(item):
value = item.get("value", {})

return (
"skip_logic",
{
"choice": value.get("choice"),
"skip_logic": value.get("skip_logic"),
"question": value.get("question"),
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"admin_label": "Learn anything?",
"choices": "All of this information is new to me, Most of this information is new to me, A little bit of this information is new to me, None of of this information is new to me",
"default_value": "",
"field_type": "radio",
"help_text": "Please choose one answer only",
"label": "Have you learnt anything new from this All In information?",
"page": 188,
"page_break": false,
"pk": 38,
"required": true,
"skip_logic": "[{\"type\": \"skip_logic\", \"value\": {\"choice\": \"Yes\", \"skip_logic\": \"next\", \"survey\": null, \"question\": null}, \"id\": \"70666059-b8d3-493c-8259-5c58208978d9\"}, {\"type\": \"skip_logic\", \"value\": {\"choice\": \"No\", \"skip_logic\": \"next\", \"survey\": null, \"question\": null}, \"id\": \"5a719c49-1b22-40c9-ae2e-5c6440c1d0f9\"}]",
"sort_order": 0
}
34 changes: 34 additions & 0 deletions iogt_content_migration/tests/test_fix_surveys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
from pathlib import Path

from django.test import TestCase
from iogt_content_migration.management.commands.fix_surveys import create_field
from questionnaires.factories import SurveyFactory


class TestFixSurveys(TestCase):
def test_create_form_field(self):
survey = SurveyFactory()

with open(open_resource("v1_page_revision_survey_form_field.json")) as fp:
field = create_field(survey, json.load(fp))

self.assertEqual(len(field.skip_logic), 2)

option = field.skip_logic[0].value
self.assertEqual(option["choice"], "Yes")
self.assertEqual(option["skip_logic"], "next")
self.assertIsNone(option["question"])

self.assertEqual(
field.choices,
"All of this information is new to me|"
"Most of this information is new to me|"
"A little bit of this information is new to me|"
"None of of this information is new to me",
)
self.assertEqual(field.page.id, survey.id)


def open_resource(filename):
return Path(__file__).parent / "resources" / filename
Loading