diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 83c2841..bf42c9c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,14 +14,12 @@ Change Log Unreleased ********** -* First functional version, includes a CMS listener for COURSE_PUBLISHED -* README updates -* New configuration settings for connection to ClickHouse - -0.1.0 – 2023-04-24 +0.1.0 – 2023-05-11 ********************************************** Added ===== -* First release on PyPI. +* First release on PyPI +* CMS listener for COURSE_PUBLISHED +* Management command to bulk push course data to ClickHouse diff --git a/README.rst b/README.rst index 7b9da0a..44e0d69 100644 --- a/README.rst +++ b/README.rst @@ -16,12 +16,31 @@ OARS consumes the data sent to ClickHouse by this plugin as part of data enrichment for reporting, or capturing data that otherwise does not fit in xAPI. +Sinks +***** + Currently the only sink is in the CMS. It listens for the ``COURSE_PUBLISHED`` signal and serializes a subset of the published course blocks into one table and the relationships between blocks into another table. With those we are able to recreate the "graph" of the course and get relevant data, such as block names, for reporting. +Commands +******** + +In addition to being an event listener, this package provides commands for +exporting the same data in bulk. This allows bootstrapping a new data platform +or backfilling lost or missing data. Currently the only command is the Django +command for the ``COURSE_PUBLISHED`` data: + +``python manage.py cms dump_courses_to_clickhouse`` + +This command allows bulk export of all courses, or various limiting factors. +Please see the command help for details: + +``python manage.py cms dump_courses_to_clickhouse -h`` + + .. _Open edX events: https://github.com/openedx/openedx-events .. _Edx Platform: https://github.com/openedx/edx-platform .. _ClickHouse: https://clickhouse.com diff --git a/event_sink_clickhouse/management/__init__.py b/event_sink_clickhouse/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/event_sink_clickhouse/management/commands/__init__.py b/event_sink_clickhouse/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py new file mode 100644 index 0000000..0bc05e1 --- /dev/null +++ b/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py @@ -0,0 +1,194 @@ +""" +Management command for exporting the modulestore ClickHouse. + +Example usages (see usage for more options): + + # Dump all courses published since last dump. + # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`: + python manage.py cms dump_courses_to_clickhouse + + # Dump all courses published since last dump. + # Use custom connection parameters to send to a different ClickHouse instance: + python manage.py cms dump_courses_to_clickhouse --host localhost \ + --user user --password password --database research_db -- + + # Specify certain courses instead of dumping all of them. + # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. + python manage.py cms dump_courses_to_clickhouse --courses 'course-v1:A+B+1' 'course-v1:A+B+2' + + # Dump a limited number of courses to prevent stress on production systems + python manage.py cms dump_courses_to_clickhouse --limit 1000 +""" +import logging +from textwrap import dedent + +from django.core.management.base import BaseCommand, CommandError +from edx_django_utils.cache import RequestCache + +from event_sink_clickhouse.sinks.course_published import CoursePublishedSink +from event_sink_clickhouse.tasks import dump_course_to_clickhouse + +log = logging.getLogger(__name__) + + +def dump_target_courses_to_clickhouse( + connection_overrides=None, + course_keys=None, + courses_to_skip=None, + force=None, + limit=None +): + """ + Iterates through a list of courses in a modulestore, serializes them to csv, + then submits tasks to post them to ClickHouse. + + Arguments: + force: serialize the courses even if they've been recently + serialized + + Returns: two lists--one of the courses that had dump jobs queued for them + and one of courses that did not. + """ + sink = CoursePublishedSink(connection_overrides, log) + + submitted_courses = [] + skipped_courses = [] + + index = 0 + for course_key, should_be_dumped, reason in sink.fetch_target_courses(course_keys, courses_to_skip, force): + log.info(f"Iteration {index}: {course_key}") + index += 1 + + if not should_be_dumped: + skipped_courses.append(course_key) + log.info(f"Course {index}: Skipping course {course_key}, reason: '{reason}'") + else: + # RequestCache is a local memory cache used in modulestore for performance reasons. + # Normally it is cleared at the end of every request, but in this command it will + # continue to grow until the command is done. To prevent excessive memory consumption + # we clear it every time we dump a course. + RequestCache.clear_all_namespaces() + + log.info( + f"Course {index}: Submitting {course_key} for dump to ClickHouse, reason '{reason}'." + ) + + dump_course_to_clickhouse.apply_async( + kwargs={ + "course_key_string": str(course_key), + "connection_overrides": connection_overrides, + } + ) + + submitted_courses.append(str(course_key)) + + if limit and len(submitted_courses) == limit: + log.info(f"Limit of {limit} eligible course has been reached, quitting!") + break + + return submitted_courses, skipped_courses + + +class Command(BaseCommand): + """ + Dump course block and relationship data to a ClickHouse instance. + """ + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + parser.add_argument( + '--url', + type=str, + help="the URL of the ClickHouse server", + ) + parser.add_argument( + '--username', + type=str, + help="the username of the ClickHouse user", + ) + parser.add_argument( + '--password', + type=str, + help="the password of the ClickHouse user", + ) + parser.add_argument( + '--database', + type=str, + help="the database in ClickHouse to connect to", + ) + parser.add_argument( + '--timeout_secs', + type=int, + help="timeout for ClickHouse requests, in seconds", + ) + parser.add_argument( + '--courses', + metavar='KEY', + type=str, + nargs='*', + help="keys of courses to serialize; if omitted all courses in system are serialized", + ) + parser.add_argument( + '--courses_to_skip', + metavar='KEY', + type=str, + nargs='*', + help="keys of courses to NOT to serialize", + ) + parser.add_argument( + '--force', + action='store_true', + help="dump all courses regardless of when they were last published", + ) + parser.add_argument( + '--limit', + type=int, + help="maximum number of courses to dump, cannot be used with '--courses' or '--force'", + ) + + def handle(self, *args, **options): + """ + Iterates through each course, serializes them into graphs, and saves + those graphs to clickhouse. + """ + connection_overrides = { + key: options[key] + for key in ["url", "username", "password", "database", "timeout_secs"] + if options[key] + } + + courses = options["courses"] if options["courses"] else [] + courses_to_skip = options["courses_to_skip"] if options["courses_to_skip"] else [] + + if options["limit"] is not None and int(options["limit"]) < 1: + message = "'limit' must be greater than 0!" + log.error(message) + raise CommandError(message) + + if options["limit"] and options["force"]: + message = "The 'limit' option cannot be used with 'force' as running the " \ + "command repeatedly will result in the same courses being dumped every time." + log.error(message) + raise CommandError(message) + + submitted_courses, skipped_courses = dump_target_courses_to_clickhouse( + connection_overrides, + [course_key.strip() for course_key in courses], + [course_key.strip() for course_key in courses_to_skip], + options['force'], + options['limit'] + ) + + log.info( + "%d courses submitted for export to ClickHouse. %d courses skipped.", + len(submitted_courses), + len(skipped_courses), + ) + + if not submitted_courses: + log.info("No courses submitted for export to ClickHouse at all!") + else: + log.info( # pylint: disable=logging-not-lazy + "These courses were submitted for dump to ClickHouse successfully:\n\t" + + "\n\t".join(submitted_courses) + ) diff --git a/event_sink_clickhouse/signals.py b/event_sink_clickhouse/signals.py index f752120..25890a9 100644 --- a/event_sink_clickhouse/signals.py +++ b/event_sink_clickhouse/signals.py @@ -3,11 +3,11 @@ """ -def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument +def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument # pragma: no cover """ Receives COURSE_PUBLISHED signal and queues the dump job. """ # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from .tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel + from event_sink_clickhouse.tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel dump_course_to_clickhouse.delay(str(course_key)) diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 6bcd40a..987c484 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -1,6 +1,7 @@ """ Base classes for event sinks """ +import json from collections import namedtuple import requests @@ -29,19 +30,35 @@ def __init__(self, connection_overrides, log): self.ch_database = connection_overrides.get("database", self.ch_database) self.ch_timeout_secs = connection_overrides.get("timeout_secs", self.ch_timeout_secs) - def _send_clickhouse_request(self, request): + def _send_clickhouse_request(self, request, expected_insert_rows=None): """ Perform the actual HTTP requests to ClickHouse. """ session = requests.Session() prepared_request = request.prepare() + response = None try: response = session.send(prepared_request, timeout=self.ch_timeout_secs) response.raise_for_status() + + if expected_insert_rows: + summary = response.headers["X-ClickHouse-Summary"] + written_rows = json.loads(summary)["written_rows"] + if expected_insert_rows != int(written_rows): + self.log.error( + f"Clickhouse query {prepared_request.url} expected {expected_insert_rows} " + f"rows to be inserted, but only got {written_rows}!" + ) + + return response except requests.exceptions.HTTPError as e: self.log.error(str(e)) self.log.error(e.response.headers) self.log.error(e.response) self.log.error(e.response.text) raise + except (requests.exceptions.InvalidJSONError, KeyError): + # ClickHouse can be configured not to return the metadata / summary we check above for + # performance reasons. It's not critical, so we eat those here. + return response diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index 5c01afd..6208e6f 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -11,11 +11,14 @@ """ import csv +import datetime import io +import json import uuid import requests from django.utils import timezone +from opaque_keys.edx.keys import CourseKey from event_sink_clickhouse.sinks.base_sink import BaseSink @@ -31,7 +34,7 @@ class CoursePublishedSink(BaseSink): Event sink for the COURSE_PUBLISHED signal """ @staticmethod - def _get_detached_xblock_types(): + def _get_detached_xblock_types(): # pragma: no cover """ Import and return DETACHED_XBLOCK_TYPES. Placed here to avoid model import at startup and to facilitate mocking them in testing. @@ -41,7 +44,7 @@ def _get_detached_xblock_types(): return DETACHED_XBLOCK_TYPES @staticmethod - def _get_modulestore(): + def _get_modulestore(): # pragma: no cover """ Import and return modulestore. Placed here to avoid model import at startup and to facilitate mocking them in testing. @@ -51,7 +54,7 @@ def _get_modulestore(): return modulestore() @staticmethod - def _get_course_overview_model(): + def _get_course_overview_model(): # pragma: no cover """ Import and return CourseOverview. Placed here to avoid model import at startup and to facilitate mocking them in testing. @@ -71,29 +74,7 @@ def strip_branch_and_version(location): return location.for_branch(None) @staticmethod - def get_course_last_published(course_key): - """ - Get approximate last publish date for the given course. - - We use the 'modified' column in the CourseOverview table as a quick and easy - (although perhaps inexact) way of determining when a course was last - published. This works because CourseOverview rows are re-written upon - course publish. - - Args: - course_key: a CourseKey - - Returns: The datetime the course was last published at, stringified. - Uses Python's default str(...) implementation for datetimes, which - is sortable and similar to ISO 8601: - https://docs.python.org/3/library/datetime.html#datetime.date.__str__ - """ - CourseOverview = CoursePublishedSink._get_course_overview_model() - approx_last_published = CourseOverview.get_from_id(course_key).modified - return str(approx_last_published) - - @staticmethod - def serialize_item(item, index, detached_xblock_types, dump_id, dump_timestamp): + def serialize_xblock(item, index, detached_xblock_types, dump_id, dump_timestamp): """ Args: item: an XBlock @@ -103,19 +84,37 @@ def serialize_item(item, index, detached_xblock_types, dump_id, dump_timestamp): fields: a *limited* dictionary of an XBlock's field names and values block_type: the name of the XBlock's type (i.e. 'course' or 'problem') + + Schema of the destination table, as defined in tutor-contrib-oars: + org String NOT NULL, + course_key String NOT NULL, + location String NOT NULL, + display_name String NOT NULL, + xblock_data_json String NOT NULL, + order Int32 default 0, + edited_on String NOT NULL, + dump_id UUID NOT NULL, + time_last_dumped String NOT NULL """ course_key = item.scope_ids.usage_id.course_key block_type = item.scope_ids.block_type + # Extra data not needed for the table to function, things can be + # added here without needing to rebuild the whole table. + json_data = { + 'course': course_key.course, + 'run': course_key.run, + 'block_type': block_type, + 'detached': 1 if block_type in detached_xblock_types else 0, + } + + # Core table data, if things change here it's a big deal. serialized_block = { 'org': course_key.org, 'course_key': str(course_key), - 'course': course_key.course, - 'run': course_key.run, 'location': str(item.location), 'display_name': item.display_name_with_default.replace("'", "\'"), - 'block_type': block_type, - 'detached': 1 if block_type in detached_xblock_types else 0, + 'xblock_data_json': json.dumps(json_data), 'order': index, 'edited_on': str(getattr(item, 'edited_on', '')), 'dump_id': dump_id, @@ -124,6 +123,55 @@ def serialize_item(item, index, detached_xblock_types, dump_id, dump_timestamp): return serialized_block + @staticmethod + def serialize_course_overview(overview, dump_id, time_last_dumped): + """ + Return a dict representing a subset of CourseOverview fields. + + Schema of the downstream table as defined in tutor-contrib-oars: + org String NOT NULL, + course_key String NOT NULL, + display_name String NOT NULL, + course_start String NOT NULL, + course_end String NOT NULL, + enrollment_start String NOT NULL, + enrollment_end String NOT NULL, + self_paced BOOL NOT NULL, + course_data_json String NOT NULL, + created String NOT NULL, + modified String NOT NULL + dump_id UUID NOT NULL, + time_last_dumped String NOT NULL + """ + json_fields = { + "advertised_start": str(overview.advertised_start), + "announcement": str(overview.announcement), + "lowest_passing_grade": str(overview.lowest_passing_grade), + "invitation_only": overview.invitation_only, + "max_student_enrollments_allowed": overview.max_student_enrollments_allowed, + "effort": overview.effort, + "enable_proctored_exams": overview.enable_proctored_exams, + "entrance_exam_enabled": overview.entrance_exam_enabled, + "external_id": overview.external_id, + "language": overview.language, + } + + return { + "org": overview.org, + "course_key": str(overview.id), + "display_name": overview.display_name, + "course_start": overview.start, + "course_end": overview.end, + "enrollment_start": overview.enrollment_start, + "enrollment_end": overview.enrollment_end, + "self_paced": overview.self_paced, + "course_data_json": json.dumps(json_fields), + "created": overview.created, + "modified": overview.modified, + "dump_id": dump_id, + "time_last_dumped": time_last_dumped + } + def serialize_course(self, course_id): """ Serializes a course into a CSV of nodes and relationships. @@ -141,6 +189,10 @@ def serialize_course(self, course_id): dump_id = str(uuid.uuid4()) dump_timestamp = str(timezone.now()) + courseoverview_model = self._get_course_overview_model() + course_overview = courseoverview_model.get_from_id(course_id) + serialized_course_overview = self.serialize_course_overview(course_overview, dump_id, dump_timestamp) + # Create a location to node mapping as a lookup for writing relationships later location_to_node = {} items = modulestore.get_items(course_id) @@ -150,7 +202,7 @@ def serialize_course(self, course_id): index = 0 for item in items: index += 1 - fields = self.serialize_item(item, index, detached_xblock_types, dump_id, dump_timestamp) + fields = self.serialize_xblock(item, index, detached_xblock_types, dump_id, dump_timestamp) location_to_node[self.strip_branch_and_version(item.location)] = fields # Create a list of relationships between blocks, using their locations as identifiers @@ -172,7 +224,33 @@ def serialize_course(self, course_id): relationships.append(relationship) nodes = list(location_to_node.values()) - return nodes, relationships + return serialized_course_overview, nodes, relationships + + def _send_course_overview(self, serialized_overview): + """ + Create the insert query and CSV to send the serialized CourseOverview to ClickHouse. + + We still use a CSV here even though there's only 1 row because it affords handles + type serialization for us and keeps the pattern consistent. + """ + params = CLICKHOUSE_BULK_INSERT_PARAMS.copy() + + # "query" is a special param for the query, it's the best way to get the FORMAT CSV in there. + params["query"] = f"INSERT INTO {self.ch_database}.course_overviews FORMAT CSV" + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) + writer.writerow(serialized_overview.values()) + + request = requests.Request( + 'POST', + self.ch_url, + data=output.getvalue(), + params=params, + auth=self.ch_auth + ) + + self._send_clickhouse_request(request, expected_insert_rows=1) def _send_xblocks(self, serialized_xblocks): """ @@ -197,7 +275,7 @@ def _send_xblocks(self, serialized_xblocks): auth=self.ch_auth ) - self._send_clickhouse_request(request) + self._send_clickhouse_request(request, expected_insert_rows=len(serialized_xblocks)) def _send_relationships(self, relationships): """ @@ -221,13 +299,13 @@ def _send_relationships(self, relationships): auth=self.ch_auth ) - self._send_clickhouse_request(request) + self._send_clickhouse_request(request, expected_insert_rows=len(relationships)) def dump(self, course_key): """ Do the serialization and send to ClickHouse """ - serialized_blocks, relationships = self.serialize_course(course_key) + serialized_courseoverview, serialized_blocks, relationships = self.serialize_course(course_key) self.log.info( "Now dumping %s to ClickHouse: %d serialized_blocks and %d relationships", @@ -239,6 +317,7 @@ def dump(self, course_key): course_string = str(course_key) try: + self._send_course_overview(serialized_courseoverview) self._send_xblocks(serialized_blocks) self._send_relationships(relationships) self.log.info("Completed dumping %s to ClickHouse", course_key) @@ -248,3 +327,143 @@ def dump(self, course_key): course_string ) raise + + @staticmethod + def get_course_last_published(course_key): + """ + Get approximate last publish date for the given course. + + We use the 'modified' column in the CourseOverview table as a quick and easy + (although perhaps inexact) way of determining when a course was last + published. This works because CourseOverview rows are re-written upon + course publish. + + Args: + course_key: a CourseKey + + Returns: The datetime the course was last published at, stringified. + Uses Python's default str(...) implementation for datetimes, which + is sortable and similar to ISO 8601: + https://docs.python.org/3/library/datetime.html#datetime.date.__str__ + """ + CourseOverview = CoursePublishedSink._get_course_overview_model() + approx_last_published = CourseOverview.get_from_id(course_key).modified + if approx_last_published: + return str(approx_last_published) + + return None + + def fetch_target_courses(self, courses=None, skipped_courses=None, force=None): + """ + Yield a set of courses meeting the given criteria. + + If no parameters are given, loads all course_keys from the + modulestore. Filters out course_keys in the `skip` parameter, + if provided. + + Args: + courses: A list of string serializations of course keys. + For example, ["course-v1:org+course+run"]. + skipped_courses: A list of string serializations of course keys to + be ignored. + force: Include all courses except those explicitly skipped via + skipped_courses + """ + modulestore = CoursePublishedSink._get_modulestore() + + if courses: + course_keys = [CourseKey.from_string(course) for course in courses] + else: + course_keys = [ + course.id for course in modulestore.get_course_summaries() + ] + + for course_key in course_keys: + if course_key in skipped_courses: + yield course_key, False, "Course is explicitly skipped" + elif force: + yield course_key, True, "Force is set" + else: + should_be_dumped, reason = self.should_dump_course(course_key) + yield course_key, should_be_dumped, reason + + def get_course_last_dump_time(self, course_key): + """ + Get the most recent dump time for this course from ClickHouse + + Args: + course_key: a CourseKey + + Returns: The datetime that the command was last run, converted into + text, or None, if there's no record of this command last being run. + """ + params = { + "query": f"SELECT max(time_last_dumped) as time_last_dumped " + f"FROM {self.ch_database}.course_blocks " + f"WHERE course_key = '{course_key}'" + } + + request = requests.Request( + 'GET', + self.ch_url, + params=params, + auth=self.ch_auth + ) + + response = self._send_clickhouse_request(request) + response.raise_for_status() + if response.text.strip(): + # ClickHouse returns timestamps in the format: "2023-05-03 15:47:39.331024+00:00" + # Our internal comparisons use the str() of a datetime object, this handles that + # transformation so that downstream comparisons will work. + return str(datetime.datetime.fromisoformat(response.text.strip())) + + # Course has never been dumped, return None + return None + + def should_dump_course(self, course_key): + """ + Only dump the course if it's been changed since the last time it's been + dumped. + + Args: + course_key: a CourseKey object. + + Returns: + - whether this course should be dumped (bool) + - reason why course needs, or does not need, to be dumped (string) + """ + + course_last_dump_time = self.get_course_last_dump_time(course_key) + + # If we don't have a record of the last time this command was run, + # we should serialize the course and dump it + if course_last_dump_time is None: + return True, "Course is not present in ClickHouse" + + course_last_published_date = self.get_course_last_published(course_key) + + # If we've somehow dumped this course but there is no publish date + # skip it + if course_last_dump_time and course_last_published_date is None: + return False, "No last modified date in CourseOverview" + + # Otherwise, dump it if it is newer + course_last_dump_time = datetime.datetime.strptime(course_last_dump_time, "%Y-%m-%d %H:%M:%S.%f+00:00") + course_last_published_date = datetime.datetime.strptime( + course_last_published_date, + "%Y-%m-%d %H:%M:%S.%f+00:00" + ) + needs_dump = course_last_dump_time < course_last_published_date + + if needs_dump: + reason = ( + "Course has been published since last dump time - " + f"last dumped {course_last_dump_time} < last published {str(course_last_published_date)}" + ) + else: + reason = ( + f"Course has NOT been published since last dump time - " + f"last dumped {course_last_dump_time} >= last published {str(course_last_published_date)}" + ) + return needs_dump, reason diff --git a/event_sink_clickhouse/urls.py b/event_sink_clickhouse/urls.py index 76e1338..a9ac6b8 100644 --- a/event_sink_clickhouse/urls.py +++ b/event_sink_clickhouse/urls.py @@ -1,10 +1,8 @@ """ URLs for event_sink_clickhouse. """ -from django.urls import re_path # pylint: disable=unused-import -from django.views.generic import TemplateView # pylint: disable=unused-import -urlpatterns = [ +urlpatterns = [ # pragma: no cover # TODO: Fill in URL patterns and views here. # re_path(r'', TemplateView.as_view(template_name="event_sink_clickhouse/base.html")), ] diff --git a/setup.py b/setup.py index 55e68c3..057db61 100755 --- a/setup.py +++ b/setup.py @@ -104,6 +104,7 @@ def is_requirement(line): name='openedx_event_sink_clickhouse', version=VERSION, description="""A sink for Open edX events to send them to ClickHouse""", + long_description_content_type="text/x-rst", long_description=README + '\n\n' + CHANGELOG, author='edX', author_email='oscm@edx.org', diff --git a/test_utils/helpers.py b/test_utils/helpers.py index b1de1fd..fa57d6c 100644 --- a/test_utils/helpers.py +++ b/test_utils/helpers.py @@ -3,9 +3,11 @@ """ import csv +import json import random import string -from datetime import datetime +from collections import namedtuple +from datetime import datetime, timedelta from io import StringIO from unittest.mock import MagicMock, Mock @@ -19,6 +21,32 @@ COURSE = "testcourse" COURSE_RUN = "2023_Fall" +FakeCourse = namedtuple("FakeCourse", ["id"]) +FakeCourseOverview = namedtuple("FakeCourseOverview", [ + # Key fields we keep at the top level + "id", + "org", + "display_name", + "start", + "end", + "enrollment_start", + "enrollment_end", + "self_paced", + "created", + "modified", + # Fields we stuff in JSON + "advertised_start", + "announcement", + "lowest_passing_grade", + "invitation_only", + "max_student_enrollments_allowed", + "effort", + "enable_proctored_exams", + "entrance_exam_enabled", + "external_id", + "language", +]) + class FakeXBlock: """ @@ -41,12 +69,13 @@ def get_children(self): return self.children -def course_str_factory(): +def course_str_factory(course_id=None): """ Return a valid course key string. """ - course_str = f"course-v1:{ORG}+{COURSE}+{COURSE_RUN}" - return course_str + if not course_id: + return f"course-v1:{ORG}+{COURSE}+{COURSE_RUN}" + return f"course-v1:{ORG}+{course_id}+{COURSE_RUN}" def course_key_factory(): @@ -64,13 +93,43 @@ def block_usage_locator_factory(): return BlockUsageLocator(course_key_factory(), block_type="category", block_id=block_id, deprecated=True) +def fake_course_overview_factory(modified=None): + """ + Create a fake CourseOverview object with just the fields we care about. + + Modified is overridable, but can also be None. + """ + return FakeCourseOverview( + course_key_factory(), # id + ORG, # org + "Test Course", # display_name + datetime.now() - timedelta(days=90), # start + datetime.now() + timedelta(days=90), # end + datetime.now() - timedelta(days=90), # enrollment_start + datetime.now() + timedelta(days=90), # enrollment_end + False, # self_paced + datetime.now() - timedelta(days=180), # created + modified, # modified + datetime.now() - timedelta(days=90), # advertised_start + datetime.now() - timedelta(days=90), # announcement + 71.05, # lowest_passing_grade + False, # invitation_only + 1000, # max_student_enrollments_allowed + "Pretty easy", # effort + False, # enable_proctored_exams + True, # entrance_exam_enabled + "abcd1234", # external_id + "Polish" # language + ) + + def mock_course_overview(): """ Create a fake CourseOverview object that supports just the things we care about. """ mock_overview = MagicMock() mock_overview.get_from_id = MagicMock() - mock_overview.get_from_id.return_value.modified = datetime.now() + mock_overview.get_from_id.return_value = fake_course_overview_factory(datetime.now()) return mock_overview @@ -86,6 +145,11 @@ def get_clickhouse_http_params(): """ Get the params used in ClickHouse queries. """ + overview_params = { + "input_format_allow_errors_num": 1, + "input_format_allow_errors_ratio": 0.1, + "query": "INSERT INTO cool_data.course_overviews FORMAT CSV" + } blocks_params = { "input_format_allow_errors_num": 1, "input_format_allow_errors_ratio": 0.1, @@ -97,7 +161,7 @@ def get_clickhouse_http_params(): "query": "INSERT INTO cool_data.course_relationships FORMAT CSV" } - return blocks_params, relationships_params + return overview_params, blocks_params, relationships_params def course_factory(): @@ -127,6 +191,58 @@ def course_factory(): return course +def check_overview_csv_matcher(course_overview): + """ + Match the course overview CSV against the test course. + + This is a matcher for the "responses" library. It returns a function + that actually does the matching. + """ + def match(request): + body = request.body + + f = StringIO(body) + reader = csv.reader(f) + + i = 0 + try: + # The CSV should be in the same order as our course, make sure + # everything matches + for row in reader: + assert row[0] == course_overview.org + assert row[1] == str(course_overview.id) + assert row[2] == course_overview.display_name + assert row[3] == str(course_overview.start) + assert row[4] == str(course_overview.end) + assert row[5] == str(course_overview.enrollment_start) + assert row[6] == str(course_overview.enrollment_end) + assert row[7] == str(course_overview.self_paced) + + # Get our JSON string back out from the CSV, confirm that it's + # real JSON, compare values + dumped_json = json.loads(row[8]) + + assert dumped_json["advertised_start"] == str(course_overview.advertised_start) + assert dumped_json["announcement"] == str(course_overview.announcement) + assert dumped_json["lowest_passing_grade"] == str(course_overview.lowest_passing_grade) + assert dumped_json["invitation_only"] == course_overview.invitation_only + assert dumped_json["max_student_enrollments_allowed"] == course_overview.max_student_enrollments_allowed + assert dumped_json["effort"] == course_overview.effort + assert dumped_json["enable_proctored_exams"] == course_overview.enable_proctored_exams + assert dumped_json["entrance_exam_enabled"] == course_overview.entrance_exam_enabled + assert dumped_json["external_id"] == course_overview.external_id + assert dumped_json["language"] == course_overview.language + + assert row[9] == str(course_overview.created) + assert row[10] == str(course_overview.modified) + + i += 1 + except EOFError as e: + return False, f"Mismatch in row {i}: {e}" + return True, "" + return match + + def check_block_csv_matcher(course): """ Match the course structure CSV against the test course. @@ -153,11 +269,20 @@ def match(request): block = course[i] assert row[0] == block.location.org assert row[1] == str(block.location.course_key) - assert row[2] == block.location.course - assert row[3] == block.location.run - assert row[4] == str(course[i].location) - assert row[5] == block.display_name_with_default - assert row[6] == str(block.block_type) + assert row[2] == str(course[i].location) + assert row[3] == block.display_name_with_default + + block_json_data = { + 'course': block.location.course, + 'run': block.location.run, + 'block_type': str(block.block_type), + } + csv_json = json.loads(row[4]) + + # Check some json data + assert block_json_data["course"] == csv_json["course"] + assert block_json_data["run"] == csv_json["run"] + assert block_json_data["block_type"] == csv_json["block_type"] i += 1 except AssertionError as e: return False, f"Mismatch in row {i}: {e}" @@ -197,8 +322,6 @@ def match(request): # The CSV should be in the same order as our relationships, make sure # everything matches for row in reader: - print(row) - print(relationships[i]) relation = relationships[i] assert row[0] == relation[0] assert row[1] == relation[1] diff --git a/tests/commands/test_dump_courses_command.py b/tests/commands/test_dump_courses_command.py new file mode 100644 index 0000000..c07b088 --- /dev/null +++ b/tests/commands/test_dump_courses_command.py @@ -0,0 +1,234 @@ +""" +Tests for the dump_courses_to_clickhouse management command. +""" +from collections import namedtuple +from datetime import datetime, timedelta +from unittest.mock import patch + +import django.core.management.base +import pytest +from django.core.management import call_command + +from test_utils.helpers import FakeCourse, course_str_factory, fake_course_overview_factory + + +@pytest.fixture +def mock_common_calls(): + """ + Mock out calls that we test elsewhere and aren't relevant to the command tests. + """ + command_path = "event_sink_clickhouse.management.commands.dump_courses_to_clickhouse" + with patch(command_path+".dump_course_to_clickhouse") as mock_dump_course: + with patch(command_path+".CoursePublishedSink._get_course_overview_model") as mock_get_course_overview_model: + with patch(command_path+".CoursePublishedSink._get_modulestore") as mock_modulestore: + with patch(command_path+".CoursePublishedSink.get_course_last_dump_time") as mock_last_dump_time: + # Set a reasonable default last dump time a year in the past + mock_last_dump_time.return_value = \ + (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00") + yield mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time + + +def dump_command_clickhouse_options(): + """ + Pytest params for all the different ClickHouse options. + + Just making sure every option gets passed through correctly. + """ + options = [ + {}, + {'url': "https://foo/"}, + {'username': "Foo"}, + {'password': "F00"}, + {'database': "foo"}, + {'timeout_secs': 60}, + {'url': "https://foo/", 'username': "Foo", 'password': "F00", 'database': "foo", 'timeout_secs': 60}, + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("option_combination", dump_command_clickhouse_options()) +def test_dump_courses_to_clickhouse_db_options( + option_combination, + mock_common_calls, + caplog +): + mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls + + course_id = course_str_factory() + + fake_modulestore_courses = [FakeCourse(course_id)] + mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses + fake_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) + mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview + + call_command( + 'dump_courses_to_clickhouse', + **option_combination + ) + + # Make sure that our mocks were called as expected + assert mock_modulestore.call_count == 1 + assert mock_dump_course.apply_async.call_count == 1 + mock_dump_course.apply_async.assert_called_once_with(kwargs=dict( + course_key_string=course_id, + connection_overrides=option_combination + )) + assert "Course has been published since last dump time" in caplog.text + assert "These courses were submitted for dump to ClickHouse successfully" in caplog.text + + +CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"]) + + +def dump_command_basic_options(): + """ + Pytest params for all the different non-ClickHouse command options. + """ + options = [ + CommandOptions( + options={"courses_to_skip": [course_str_factory()]}, + expected_num_submitted=0, + expected_logs=[ + "0 courses submitted for export to ClickHouse. 1 courses skipped.", + "Course is explicitly skipped" + ] + ), + CommandOptions( + options={"limit": 1}, + expected_num_submitted=1, + expected_logs=["Limit of 1 eligible course has been reached, quitting!"] + ), + CommandOptions( + options={"courses": [course_str_factory()]}, + expected_num_submitted=1, + expected_logs=[ + "Course has been published since last dump time", + "These courses were submitted for dump to ClickHouse successfully" + ] + ), + CommandOptions( + options={"force": True}, + expected_num_submitted=1, + expected_logs=["Force is set"] + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", dump_command_basic_options()) +def test_dump_courses_options( + test_command_option, + mock_common_calls, + caplog +): + mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls + + option_combination, expected_num_submitted, expected_outputs = test_command_option + course_id = course_str_factory() + + fake_modulestore_courses = [FakeCourse(course_id), ] + mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses + mock_get_course_overview_model.return_value.get_from_id.return_value = fake_course_overview_factory( + modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") + ) + + call_command( + 'dump_courses_to_clickhouse', + **option_combination + ) + + # Make sure that our mocks were called as expected + assert mock_modulestore.call_count == 1 + assert mock_dump_course.apply_async.call_count == expected_num_submitted + for expected_output in expected_outputs: + assert expected_output in caplog.text + + +def dump_command_invalid_options(): + """ + Pytest params for all the different non-ClickHouse command options. + """ + options = [ + CommandOptions( + options={"force": True, "limit": 100}, + expected_num_submitted=0, + expected_logs=[ + "The 'limit' option cannot be used with 'force'", + ] + ), + CommandOptions( + options={"limit": -1}, + expected_num_submitted=0, + expected_logs=["'limit' must be greater than 0!"] + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", dump_command_invalid_options()) +def test_invalid_dump_command_options( + test_command_option, + mock_common_calls, + caplog +): + mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls + option_combination, expected_num_submitted, expected_outputs = test_command_option + + with pytest.raises(django.core.management.base.CommandError): + call_command( + 'dump_courses_to_clickhouse', + **option_combination + ) + + # Just to make sure we're not calling things more than we need to + assert mock_modulestore.call_count == 0 + assert mock_dump_course.apply_async.call_count == 0 + for expected_output in expected_outputs: + assert expected_output in caplog.text + + +def test_multiple_courses_different_times( + mock_common_calls, + caplog +): + mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls + + test_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") + + course_id_1 = course_str_factory("course_1") + course_id_2 = course_str_factory("course_2") + course_id_3 = course_str_factory("course_3") + + fake_modulestore_courses = [FakeCourse(course_id_1), FakeCourse(course_id_2), FakeCourse(course_id_3)] + mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses + + fake_overview = fake_course_overview_factory(modified=test_timestamp) + mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview + + # Each time last_dump_time is called it will get a different date so we can test + # them all together + mock_last_dump_time.side_effect = [ + # One year ago + (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00"), + # A magic date that matches the last published date of our test course + test_timestamp, + # Course not dumped to ClickHouse yet + None, + ] + + call_command( + 'dump_courses_to_clickhouse' + ) + + assert mock_modulestore.call_count == 1 + assert mock_last_dump_time.call_count == 3 + assert "Course has been published since last dump time" in caplog.text + assert "Course has NOT been published since last dump time" in caplog.text + assert "Course is not present in ClickHouse" in caplog.text + assert "2 courses submitted for export to ClickHouse. 1 courses skipped." in caplog.text diff --git a/tests/test_course_published.py b/tests/test_course_published.py index 037be6a..8b0f4b4 100644 --- a/tests/test_course_published.py +++ b/tests/test_course_published.py @@ -15,9 +15,11 @@ from event_sink_clickhouse.tasks import dump_course_to_clickhouse from test_utils.helpers import ( check_block_csv_matcher, + check_overview_csv_matcher, check_relationship_csv_matcher, course_factory, course_str_factory, + fake_course_overview_factory, get_clickhouse_http_params, mock_course_overview, mock_detached_xblock_types, @@ -25,27 +27,35 @@ @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") @patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_detached_xblock_types") @patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_modulestore") -def test_course_publish_success(mock_modulestore, mock_detached, caplog): +def test_course_publish_success(mock_modulestore, mock_detached, mock_overview): """ Test of a successful end-to-end run. """ - # Necessary to get logs from the task - caplog.set_level(logging.INFO, logger="edx.celery.task") - # Create a fake course structure with a few fake XBlocks course = course_factory() + course_overview = fake_course_overview_factory(modified=datetime.now()) mock_modulestore.return_value.get_items.return_value = course # Fake the "detached types" list since we can't import it here mock_detached.return_value = mock_detached_xblock_types() + mock_overview.return_value.get_from_id.return_value = course_overview + # Use the responses library to catch the POSTs to ClickHouse # and match them against the expected values, including CSV # content - blocks_params, relationships_params = get_clickhouse_http_params() + course_overview_params, blocks_params, relationships_params = get_clickhouse_http_params() + responses.post( + "https://foo.bar/", + match=[ + matchers.query_param_matcher(course_overview_params), + check_overview_csv_matcher(course_overview) + ], + ) responses.post( "https://foo.bar/", match=[ @@ -70,17 +80,21 @@ def test_course_publish_success(mock_modulestore, mock_detached, caplog): @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") @patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_detached_xblock_types") @patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_modulestore") -def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, caplog): +# pytest:disable=unused-argument +def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_overview, caplog): """ Test the case where a ClickHouse POST fails. """ - caplog.set_level(logging.INFO, logger="edx.celery.task") course = course_factory() mock_modulestore.return_value.get_items.return_value = course mock_detached.return_value = mock_detached_xblock_types() + course_overview = fake_course_overview_factory(modified=datetime.now()) + mock_overview.return_value.get_from_id.return_value = course_overview + # This will raise an exception when we try to post to ClickHouse responses.post( "https://foo.bar/", @@ -105,7 +119,7 @@ def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, caplog @patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") def test_get_course_last_published(mock_overview): """ - This function isn't in use yet, but we'll need it for the management command + Make sure we get a valid date back from this in the expected format. """ # Create a fake course overview, which will return a datetime object course = mock_course_overview() @@ -118,3 +132,77 @@ def test_get_course_last_published(mock_overview): last_published_date = CoursePublishedSink.get_course_last_published(course_key) dt = datetime.strptime(last_published_date, "%Y-%m-%d %H:%M:%S.%f") assert dt + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") +def test_no_last_published_date(mock_overview): + """ + Test that we get a None value back for courses that don't have a modified date. + + In some cases there is not modified date on a course. In coursegraph we + skipped these if they are already in the database, so we're continuing this trend here. + """ + # Fake a course with no modified date + course = mock_course_overview() + mock_overview.return_value = course + mock_overview.return_value.get_from_id.return_value = fake_course_overview_factory(modified=None) + + # Request our course last published date + course_key = course_str_factory() + + # should_dump_course will reach out to ClickHouse for the last dump date + # we'll fake the response here to have any date, such that we'll exercise + # all the "no modified date" code. + responses.get( + "https://foo.bar/", + body="2023-05-03 15:47:39.331024+00:00" + ) + + # Confirm that the string date we get back is a valid date + sink = CoursePublishedSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_course(course_key) + + assert should_dump_course is False + assert reason == "No last modified date in CourseOverview" + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_course_not_present_in_clickhouse(): + """ + Test that a course gets dumped if it's never been dumped before + """ + # Request our course last published date + course_key = course_str_factory() + + responses.get( + "https://foo.bar/", + body="" + ) + + # Confirm that the string date we get back is a valid date + sink = CoursePublishedSink(connection_overrides={}, log=logging.getLogger()) + last_published_date = sink.get_course_last_dump_time(course_key) + assert last_published_date is None + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_get_last_dump_time(): + """ + Test that we return the expected thing from last dump time. + """ + # Request our course last published date + course_key = course_str_factory() + + # Mock out the response we expect to get from ClickHouse, just a random + # datetime in the correct format. + responses.get( + "https://foo.bar/", + body="2023-05-03 15:47:39.331024+00:00" + ) + + # Confirm that the string date we get back is a valid date + sink = CoursePublishedSink(connection_overrides={}, log=logging.getLogger()) + last_published_date = sink.get_course_last_dump_time(course_key) + dt = datetime.strptime(last_published_date, "%Y-%m-%d %H:%M:%S.%f+00:00") + assert dt diff --git a/tox.ini b/tox.ini index 9b0e194..10048eb 100644 --- a/tox.ini +++ b/tox.ini @@ -31,30 +31,30 @@ match-dir = (?!migrations) [pytest] DJANGO_SETTINGS_MODULE = test_settings -addopts = --cov event_sink_clickhouse --cov-report term-missing --cov-report xml +addopts = --cov event_sink_clickhouse --cov-report term-missing --cov-report xml --log-level=INFO norecursedirs = .* docs requirements site-packages [testenv] -deps = +deps = django32: Django>=3.2,<4.0 django40: Django>=4.0,<4.1 -r{toxinidir}/requirements/test.txt -commands = +commands = python manage.py check pytest {posargs} [testenv:docs] -setenv = +setenv = DJANGO_SETTINGS_MODULE = test_settings PYTHONPATH = {toxinidir} # Adding the option here instead of as a default in the docs Makefile because that Makefile is generated by shpinx. SPHINXOPTS = -W -whitelist_externals = +whitelist_externals = make rm -deps = +deps = -r{toxinidir}/requirements/doc.txt -commands = +commands = doc8 --ignore-path docs/_build README.rst docs rm -f docs/event_sink_clickhouse.rst rm -f docs/modules.rst @@ -64,13 +64,13 @@ commands = twine check dist/* [testenv:quality] -whitelist_externals = +whitelist_externals = make rm touch -deps = +deps = -r{toxinidir}/requirements/quality.txt -commands = +commands = touch tests/__init__.py pylint event_sink_clickhouse tests test_utils manage.py setup.py rm tests/__init__.py @@ -80,10 +80,10 @@ commands = make selfcheck [testenv:pii_check] -setenv = +setenv = DJANGO_SETTINGS_MODULE = test_settings -deps = +deps = -r{toxinidir}/requirements/test.txt -commands = +commands = code_annotations django_find_annotations --config_file .pii_annotations.yml --lint --report --coverage