diff --git a/event_sink_clickhouse/__init__.py b/event_sink_clickhouse/__init__.py index a656735..c713846 100644 --- a/event_sink_clickhouse/__init__.py +++ b/event_sink_clickhouse/__init__.py @@ -2,4 +2,4 @@ A sink for Open edX events to send them to ClickHouse. """ -__version__ = "1.0.0" +__version__ = "1.1.0" diff --git a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py deleted file mode 100644 index dd87a84..0000000 --- a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -Management command for exporting the modulestore ClickHouse. - -Example usages (see usage for more options): - - # Dump all courses published since last dump. - # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`: - python manage.py cms dump_courses_to_clickhouse - - # Dump all courses published since last dump. - # Use custom connection parameters to send to a different ClickHouse instance: - python manage.py cms dump_courses_to_clickhouse --host localhost \ - --user user --password password --database research_db -- - - # Specify certain courses instead of dumping all of them. - # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. - python manage.py cms dump_courses_to_clickhouse --courses 'course-v1:A+B+1' 'course-v1:A+B+2' - - # Dump a limited number of courses to prevent stress on production systems - python manage.py cms dump_courses_to_clickhouse --limit 1000 -""" -import logging -from textwrap import dedent - -from django.core.management.base import BaseCommand, CommandError -from edx_django_utils.cache import RequestCache - -from event_sink_clickhouse.sinks.course_published import CourseOverviewSink -from event_sink_clickhouse.tasks import dump_course_to_clickhouse - -log = logging.getLogger(__name__) - - -def dump_target_courses_to_clickhouse( - connection_overrides=None, - course_keys=None, - courses_to_skip=None, - force=None, - limit=None, -): - """ - Iterates through a list of courses in a modulestore, serializes them to csv, - then submits tasks to post them to ClickHouse. - - Arguments: - force: serialize the courses even if they've been recently - serialized - - Returns: two lists--one of the courses that had dump jobs queued for them - and one of courses that did not. - """ - sink = CourseOverviewSink(connection_overrides, log) - - submitted_courses = [] - skipped_courses = [] - - index = 0 - for course_key, should_be_dumped, reason in sink.fetch_target_items( - course_keys, courses_to_skip, force - ): - log.info(f"Iteration {index}: {course_key}") - index += 1 - - if not should_be_dumped: - skipped_courses.append(course_key) - log.info( - f"Course {index}: Skipping course {course_key}, reason: '{reason}'" - ) - else: - # RequestCache is a local memory cache used in modulestore for performance reasons. - # Normally it is cleared at the end of every request, but in this command it will - # continue to grow until the command is done. To prevent excessive memory consumption - # we clear it every time we dump a course. - RequestCache.clear_all_namespaces() - - log.info( - f"Course {index}: Submitting {course_key} for dump to ClickHouse, reason '{reason}'." - ) - - dump_course_to_clickhouse.apply_async( - kwargs={ - "course_key_string": str(course_key), - "connection_overrides": connection_overrides, - } - ) - - submitted_courses.append(str(course_key)) - - if limit and len(submitted_courses) == limit: - log.info( - f"Limit of {limit} eligible course has been reached, quitting!" - ) - break - - return submitted_courses, skipped_courses - - -class Command(BaseCommand): - """ - Dump course block data to a ClickHouse instance. - """ - - help = dedent(__doc__).strip() - - def add_arguments(self, parser): - parser.add_argument( - "--url", - type=str, - help="the URL of the ClickHouse server", - ) - parser.add_argument( - "--username", - type=str, - help="the username of the ClickHouse user", - ) - parser.add_argument( - "--password", - type=str, - help="the password of the ClickHouse user", - ) - parser.add_argument( - "--database", - type=str, - help="the database in ClickHouse to connect to", - ) - parser.add_argument( - "--timeout_secs", - type=int, - help="timeout for ClickHouse requests, in seconds", - ) - parser.add_argument( - "--courses", - metavar="KEY", - type=str, - nargs="*", - help="keys of courses to serialize; if omitted all courses in system are serialized", - ) - parser.add_argument( - "--courses_to_skip", - metavar="KEY", - type=str, - nargs="*", - help="keys of courses to NOT to serialize", - ) - parser.add_argument( - "--force", - action="store_true", - help="dump all courses regardless of when they were last published", - ) - parser.add_argument( - "--limit", - type=int, - help="maximum number of courses to dump, cannot be used with '--courses' or '--force'", - ) - - def handle(self, *args, **options): - """ - Iterates through each course, serializes them into graphs, and saves - those graphs to clickhouse. - """ - connection_overrides = { - key: options[key] - for key in ["url", "username", "password", "database", "timeout_secs"] - if options[key] - } - - courses = options["courses"] if options["courses"] else [] - courses_to_skip = ( - options["courses_to_skip"] if options["courses_to_skip"] else [] - ) - - if options["limit"] is not None and int(options["limit"]) < 1: - message = "'limit' must be greater than 0!" - log.error(message) - raise CommandError(message) - - if options["limit"] and options["force"]: - message = ( - "The 'limit' option cannot be used with 'force' as running the " - "command repeatedly will result in the same courses being dumped every time." - ) - log.error(message) - raise CommandError(message) - - submitted_courses, skipped_courses = dump_target_courses_to_clickhouse( - connection_overrides, - [course_key.strip() for course_key in courses], - [course_key.strip() for course_key in courses_to_skip], - options["force"], - options["limit"], - ) - - log.info( - "%d courses submitted for export to ClickHouse. %d courses skipped.", - len(submitted_courses), - len(skipped_courses), - ) - - if not submitted_courses: - log.info("No courses submitted for export to ClickHouse at all!") - else: - log.info( # pylint: disable=logging-not-lazy - "These courses were submitted for dump to ClickHouse successfully:\n\t" - + "\n\t".join(submitted_courses) - ) diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index 25605c7..a501c58 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -14,24 +14,27 @@ # Dump a limited number of objects to prevent stress on production systems python manage.py cms dump_objects_to_clickhouse --limit 1000 """ + import logging +import time from textwrap import dedent from django.core.management.base import BaseCommand, CommandError from event_sink_clickhouse.sinks.base_sink import ModelBaseSink -from event_sink_clickhouse.tasks import dump_data_to_clickhouse log = logging.getLogger(__name__) def dump_target_objects_to_clickhouse( - connection_overrides=None, sink=None, + start_pk=None, object_ids=None, objects_to_skip=None, - force=None, + force=False, limit=None, + batch_size=1000, + sleep_time=10, ): """ Iterates through a list of objects in the ORN, serializes them to csv, @@ -45,44 +48,37 @@ def dump_target_objects_to_clickhouse( and one of objects that did not. """ - submitted_objects = [] + count = 0 skipped_objects = [] + objects_to_submit = [] - index = 0 - for object_id, should_be_dumped, reason in sink.fetch_target_items( - object_ids, objects_to_skip, force + for obj, should_be_dumped, reason in sink.fetch_target_items( + start_pk, object_ids, objects_to_skip, force, batch_size ): - log.info(f"Iteration {index}: {object_id}") - index += 1 - if not should_be_dumped: - skipped_objects.append(object_id) - log.info( - f"{sink.model} {index}: Skipping object {object_id}, reason: '{reason}'" - ) + skipped_objects.append(obj.pk) + log.info(f"{sink.model}: Skipping object {obj.pk}, reason: '{reason}'") else: - log.info( - f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'." - ) - - dump_data_to_clickhouse.apply_async( - kwargs={ - "sink_module": sink.__module__, - "sink_name": sink.__class__.__name__, - "object_id": str(object_id), - "connection_overrides": connection_overrides, - } - ) - - submitted_objects.append(str(object_id)) - - if limit and len(submitted_objects) == limit: + objects_to_submit.append(obj) + if len(objects_to_submit) % batch_size == 0: + count += len(objects_to_submit) + sink.dump(objects_to_submit, many=True) + objects_to_submit = [] + log.info(f"Last ID: {obj.pk}") + time.sleep(sleep_time) + + if limit and count == limit: log.info( f"Limit of {limit} eligible objects has been reached, quitting!" ) break - return submitted_objects, skipped_objects + if objects_to_submit: + sink.dump(objects_to_submit, many=True) + count += len(objects_to_submit) + log.info(f"Last ID: {objects_to_submit[-1].pk}") + + log.info(f"Dumped {count} objects to ClickHouse") class Command(BaseCommand): @@ -123,6 +119,12 @@ def add_arguments(self, parser): type=str, help="the type of object to dump", ) + parser.add_argument( + "--start_pk", + type=int, + help="the primary key to start at", + default=None, + ) parser.add_argument( "--ids", metavar="KEY", @@ -147,6 +149,18 @@ def add_arguments(self, parser): type=int, help="maximum number of objects to dump, cannot be used with '--ids' or '--force'", ) + parser.add_argument( + "--batch_size", + type=int, + default=10000, + help="number of objects to dump in a single batch", + ) + parser.add_argument( + "--sleep_time", + type=int, + default=1, + help="number of seconds to sleep between batches", + ) def handle(self, *args, **options): """ @@ -179,29 +193,15 @@ def handle(self, *args, **options): log.error(message) raise CommandError(message) - for cls in ModelBaseSink.__subclasses__(): # pragma: no cover - if cls.model == options["object"]: - sink = cls(connection_overrides, log) - submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( - connection_overrides, - sink, - [object_id.strip() for object_id in ids], - [object_id.strip() for object_id in ids_to_skip], - options["force"], - options["limit"], - ) - - log.info( - "%d objects submitted for export to ClickHouse. %d objects skipped.", - len(submitted_objects), - len(skipped_objects), - ) - - if not submitted_objects: - log.info("No objects submitted for export to ClickHouse at all!") - else: - log.info( # pylint: disable=logging-not-lazy - "These objects were submitted for dump to ClickHouse successfully:\n\t" - + "\n\t".join(submitted_objects) - ) - break + Sink = ModelBaseSink.get_sink_by_model_name(options["object"]) + sink = Sink(connection_overrides, log) + dump_target_objects_to_clickhouse( + sink, + options["start_pk"], + [object_id.strip() for object_id in ids], + [object_id.strip() for object_id in ids_to_skip], + options["force"], + options["limit"], + options["batch_size"], + options["sleep_time"], + ) diff --git a/event_sink_clickhouse/settings/common.py b/event_sink_clickhouse/settings/common.py index 73dae80..2ff6818 100644 --- a/event_sink_clickhouse/settings/common.py +++ b/event_sink_clickhouse/settings/common.py @@ -13,7 +13,7 @@ def plugin_settings(settings): # to avoid pulling in more dependencies to the platform than necessary. "url": "http://clickhouse:8123", "username": "ch_cms", - "password": "TYreGozgtDG3vkoWPUHVVM6q", + "password": "password", "database": "event_sink", "timeout_secs": 5, } diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 4a68745..6288ae9 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -8,6 +8,7 @@ import requests from django.conf import settings +from django.core.paginator import Paginator from edx_toggles.toggles import WaffleFlag from event_sink_clickhouse.utils import get_model @@ -124,6 +125,10 @@ class ModelBaseSink(BaseSink): list: A list of nested sink instances that can be used to further process or route the event data. Nested sinks allow chaining multiple sinks together for more complex event processing pipelines. """ + pk_format = int + """ + function: A function to format the primary key of the model + """ def __init__(self, connection_overrides, log): super().__init__(connection_overrides, log) @@ -151,11 +156,15 @@ def get_model(self): """ return get_model(self.model) - def get_queryset(self): + def get_queryset(self, start_pk=None): """ Return the queryset to be used for the insert """ - return self.get_model().objects.all() + if start_pk: + start_pk = self.pk_format(start_pk) + return self.get_model().objects.filter(pk__gt=start_pk).order_by("pk") + else: + return self.get_model().objects.all().order_by("pk") def dump(self, item_id, many=False, initial=None): """ @@ -272,35 +281,31 @@ def send_item(self, serialized_item, many=False): self._send_clickhouse_request(request) - def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False): + def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump=False, batch_size=None): """ Fetch the items that should be dumped to ClickHouse """ + queryset = self.get_queryset(start_pk) if ids: - item_keys = [self.convert_id(item_id) for item_id in ids] - else: - item_keys = [item.id for item in self.get_queryset()] - - skip_ids = ( - [str(item_id) for item_id in skip_ids] if skip_ids else [] - ) - - for item_key in item_keys: - if str(item_key) in skip_ids: - yield item_key, False, f"{self.name} is explicitly skipped" - elif force_dump: - yield item_key, True, "Force is set" - else: - should_be_dumped, reason = self.should_dump_item(item_key) - yield item_key, should_be_dumped, reason - - def convert_id(self, item_id): - """ - Convert the id to the correct type for the model - """ - return item_id - - def should_dump_item(self, unique_key): # pylint: disable=unused-argument + ids = [self.pk_format(id) for id in ids] + queryset = queryset.filter(pk__in=ids) + + if skip_ids: + skip_ids = [self.pk_format(id) for id in skip_ids] + queryset = queryset.exclude(pk__in=skip_ids) + + paginator = Paginator(queryset, batch_size) + for i in range(1, paginator.num_pages+1): + page = paginator.page(i) + items = page.object_list + for item in items: + if force_dump: + yield item, True, "Force is set" + else: + should_be_dumped, reason = self.should_dump_item(item) + yield item, should_be_dumped, reason + + def should_dump_item(self, item): # pylint: disable=unused-argument """ Return True if the item should be dumped to ClickHouse, False otherwise """ @@ -351,3 +356,14 @@ def is_enabled(cls): ) return enabled or waffle_flag.is_enabled() + + @classmethod + def get_sink_by_model_name(cls, model): + """ + Return the sink instance for the given model + """ + for sink in cls.__subclasses__(): + if sink.model == model: + return sink + + return None diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index 6fd0569..1e457df 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -149,8 +149,9 @@ class CourseOverviewSink(ModelBaseSink): # pylint: disable=abstract-method name = "Course Overview" serializer_class = CourseOverviewSerializer nested_sinks = [XBlockSink] + pk_format = str - def should_dump_item(self, unique_key): + def should_dump_item(self, item): """ Only dump the course if it's been changed since the last time it's been dumped. @@ -161,14 +162,14 @@ def should_dump_item(self, unique_key): - reason why course needs, or does not need, to be dumped (string) """ - course_last_dump_time = self.get_last_dumped_timestamp(unique_key) + course_last_dump_time = self.get_last_dumped_timestamp(item) # If we don't have a record of the last time this command was run, # we should serialize the course and dump it if course_last_dump_time is None: return True, "Course is not present in ClickHouse" - course_last_published_date = self.get_course_last_published(unique_key) + course_last_published_date = self.get_course_last_published(item) # If we've somehow dumped this course but there is no publish date # skip it @@ -196,7 +197,7 @@ def should_dump_item(self, unique_key): ) return needs_dump, reason - def get_course_last_published(self, course_key): + def get_course_last_published(self, course_overview): """ Get approximate last publish date for the given course. We use the 'modified' column in the CourseOverview table as a quick and easy @@ -210,16 +211,8 @@ def get_course_last_published(self, course_key): is sortable and similar to ISO 8601: https://docs.python.org/3/library/datetime.html#datetime.date.__str__ """ - CourseOverview = self.get_model() - approx_last_published = CourseOverview.get_from_id(course_key).modified + approx_last_published = course_overview.modified if approx_last_published: return str(approx_last_published) return None - - def convert_id(self, item_id): - return CourseKey.from_string(item_id) - - def get_queryset(self): - modulestore = get_modulestore() - return modulestore.get_course_summaries() diff --git a/event_sink_clickhouse/sinks/external_id_sink.py b/event_sink_clickhouse/sinks/external_id_sink.py index 3866c34..6a03bf7 100644 --- a/event_sink_clickhouse/sinks/external_id_sink.py +++ b/event_sink_clickhouse/sinks/external_id_sink.py @@ -14,3 +14,6 @@ class ExternalIdSink(ModelBaseSink): # pylint: disable=abstract-method timestamp_field = "time_last_dumped" name = "External ID" serializer_class = UserExternalIDSerializer + + def get_queryset(self, start_pk=None): + return super().get_queryset(start_pk).select_related("user", "external_id_type") diff --git a/event_sink_clickhouse/sinks/user_profile_sink.py b/event_sink_clickhouse/sinks/user_profile_sink.py index ff652f6..e8bf8f2 100644 --- a/event_sink_clickhouse/sinks/user_profile_sink.py +++ b/event_sink_clickhouse/sinks/user_profile_sink.py @@ -14,3 +14,6 @@ class UserProfileSink(ModelBaseSink): # pylint: disable=abstract-method timestamp_field = "time_last_dumped" name = "User Profile" serializer_class = UserProfileSerializer + + def get_queryset(self, start_pk=None): + return super().get_queryset(start_pk).select_related("user") diff --git a/requirements/dev.txt b/requirements/dev.txt index 27a40a4..88ac050 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -117,16 +117,20 @@ django==3.2.24 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/quality.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-i18n-tools # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/quality.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/quality.txt django-rest-framework==0.1.0 # via -r requirements/quality.txt django-waffle==4.1.0 @@ -137,6 +141,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/quality.txt + # django-mock-queries # django-rest-framework edx-django-utils==5.10.1 # via @@ -194,6 +199,10 @@ mccabe==0.7.0 # via # -r requirements/quality.txt # pylint +model-bakery==1.17.0 + # via + # -r requirements/quality.txt + # django-mock-queries mypy-extensions==1.0.0 # via black newrelic==9.6.0 diff --git a/requirements/doc.txt b/requirements/doc.txt index b165cc3..77c72f9 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -88,15 +88,19 @@ django==3.2.24 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/test.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/test.txt django-rest-framework==0.1.0 # via -r requirements/test.txt django-waffle==4.1.0 @@ -107,6 +111,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/test.txt + # django-mock-queries # django-rest-framework doc8==1.1.1 # via -r requirements/doc.in @@ -172,6 +177,10 @@ markupsafe==2.1.5 # jinja2 mdurl==0.1.2 # via markdown-it-py +model-bakery==1.17.0 + # via + # -r requirements/test.txt + # django-mock-queries more-itertools==10.2.0 # via jaraco-classes newrelic==9.6.0 diff --git a/requirements/quality.txt b/requirements/quality.txt index 5e925f3..b63cea9 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -84,15 +84,19 @@ django==3.2.24 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/test.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/test.txt django-rest-framework==0.1.0 # via -r requirements/test.txt django-waffle==4.1.0 @@ -103,6 +107,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/test.txt + # django-mock-queries # django-rest-framework edx-django-utils==5.10.1 # via @@ -144,6 +149,10 @@ markupsafe==2.1.5 # jinja2 mccabe==0.7.0 # via pylint +model-bakery==1.17.0 + # via + # -r requirements/test.txt + # django-mock-queries newrelic==9.6.0 # via # -r requirements/test.txt diff --git a/requirements/test.in b/requirements/test.in index 7f7c862..1bc4cb8 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -8,3 +8,4 @@ pytest-django # pytest extension for better Django support code-annotations # provides commands used by the pii_check make target. responses # mocks for the requests library ddt +django-mock-queries diff --git a/requirements/test.txt b/requirements/test.txt index 312b4d5..4510d3a 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -72,15 +72,19 @@ ddt==1.7.1 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/base.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/base.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/test.in django-rest-framework==0.1.0 # via -r requirements/base.txt django-waffle==4.1.0 @@ -91,6 +95,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/base.txt + # django-mock-queries # django-rest-framework edx-django-utils==5.10.1 # via @@ -120,6 +125,8 @@ markupsafe==2.1.5 # via # -r requirements/base.txt # jinja2 +model-bakery==1.17.0 + # via django-mock-queries newrelic==9.6.0 # via # -r requirements/base.txt diff --git a/test_utils/helpers.py b/test_utils/helpers.py index f748fa5..682985c 100644 --- a/test_utils/helpers.py +++ b/test_utils/helpers.py @@ -164,8 +164,7 @@ def mock_course_overview(): Create a fake CourseOverview object that supports just the things we care about. """ mock_overview = MagicMock() - mock_overview.get_from_id = MagicMock() - mock_overview.get_from_id.return_value = fake_course_overview_factory(datetime.now()) + mock_overview.return_value = fake_course_overview_factory(datetime.now()) return mock_overview diff --git a/tests/commands/test_dump_courses_command.py b/tests/commands/test_dump_courses_command.py deleted file mode 100644 index 56946ba..0000000 --- a/tests/commands/test_dump_courses_command.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Tests for the dump_courses_to_clickhouse management command. -""" -from collections import namedtuple -from datetime import datetime, timedelta -from unittest.mock import patch - -import django.core.management.base -import pytest -from django.core.management import call_command - -from test_utils.helpers import FakeCourse, course_str_factory, fake_course_overview_factory - - -@pytest.fixture -def mock_common_calls(): - """ - Mock out calls that we test elsewhere and aren't relevant to the command tests. - """ - command_path = "event_sink_clickhouse.management.commands.dump_courses_to_clickhouse" - with patch(command_path+".dump_course_to_clickhouse") as mock_dump_course: - with patch(command_path+".CourseOverviewSink.get_model") as mock_get_course_overview_model: - with patch("event_sink_clickhouse.sinks.course_published.get_modulestore") as mock_modulestore: - with patch(command_path+".CourseOverviewSink.get_last_dumped_timestamp") as mock_last_dump_time: - # Set a reasonable default last dump time a year in the past - mock_last_dump_time.return_value = \ - (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - yield mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time - - -def dump_command_clickhouse_options(): - """ - Pytest params for all the different ClickHouse options. - - Just making sure every option gets passed through correctly. - """ - options = [ - {}, - {'url': "https://foo/"}, - {'username': "Foo"}, - {'password': "F00"}, - {'database': "foo"}, - {'timeout_secs': 60}, - {'url': "https://foo/", 'username': "Foo", 'password': "F00", 'database': "foo", 'timeout_secs': 60}, - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("option_combination", dump_command_clickhouse_options()) -def test_dump_courses_to_clickhouse_db_options( - option_combination, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - course_id = course_str_factory() - - fake_modulestore_courses = [FakeCourse(course_id)] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - fake_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview - - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Make sure that our mocks were called as expected - assert mock_modulestore.call_count == 1 - assert mock_dump_course.apply_async.call_count == 1 - mock_dump_course.apply_async.assert_called_once_with(kwargs=dict( - course_key_string=course_id, - connection_overrides=option_combination - )) - assert "Course has been published since last dump time" in caplog.text - assert "These courses were submitted for dump to ClickHouse successfully" in caplog.text - - -CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"]) - - -def dump_command_basic_options(): - """ - Pytest params for all the different non-ClickHouse command options. - """ - options = [ - CommandOptions( - options={"courses_to_skip": [course_str_factory()]}, - expected_num_submitted=0, - expected_logs=[ - "0 courses submitted for export to ClickHouse. 1 courses skipped.", - "Course Overview is explicitly skipped" - ] - ), - CommandOptions( - options={"limit": 1}, - expected_num_submitted=1, - expected_logs=["Limit of 1 eligible course has been reached, quitting!"] - ), - CommandOptions( - options={"courses": [course_str_factory()]}, - expected_num_submitted=1, - expected_logs=[ - "Course has been published since last dump time", - "These courses were submitted for dump to ClickHouse successfully" - ] - ), - CommandOptions( - options={"force": True}, - expected_num_submitted=1, - expected_logs=["Force is set"] - ), - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("test_command_option", dump_command_basic_options()) -def test_dump_courses_options( - test_command_option, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - option_combination, expected_num_submitted, expected_outputs = test_command_option - course_id = course_str_factory() - - fake_modulestore_courses = [FakeCourse(course_id), ] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_course_overview_factory( - modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - ) - - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Make sure that our mocks were called as expected - if "courses" not in option_combination: - # Modulestore will only be called here if we're not passing in a list of courses - assert mock_modulestore.call_count == 1 - assert mock_dump_course.apply_async.call_count == expected_num_submitted - for expected_output in expected_outputs: - assert expected_output in caplog.text - - -def dump_command_invalid_options(): - """ - Pytest params for all the different non-ClickHouse command options. - """ - options = [ - CommandOptions( - options={"force": True, "limit": 100}, - expected_num_submitted=0, - expected_logs=[ - "The 'limit' option cannot be used with 'force'", - ] - ), - CommandOptions( - options={"limit": -1}, - expected_num_submitted=0, - expected_logs=["'limit' must be greater than 0!"] - ), - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("test_command_option", dump_command_invalid_options()) -def test_invalid_dump_command_options( - test_command_option, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - option_combination, expected_num_submitted, expected_outputs = test_command_option - - with pytest.raises(django.core.management.base.CommandError): - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Just to make sure we're not calling things more than we need to - assert mock_modulestore.call_count == 0 - assert mock_dump_course.apply_async.call_count == 0 - for expected_output in expected_outputs: - assert expected_output in caplog.text - - -def test_multiple_courses_different_times( - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - test_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - - course_id_1 = course_str_factory("course_1") - course_id_2 = course_str_factory("course_2") - course_id_3 = course_str_factory("course_3") - - fake_modulestore_courses = [FakeCourse(course_id_1), FakeCourse(course_id_2), FakeCourse(course_id_3)] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - - fake_overview = fake_course_overview_factory(modified=test_timestamp) - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview - - # Each time last_dump_time is called it will get a different date so we can test - # them all together - mock_last_dump_time.side_effect = [ - # One year ago - (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00"), - # A magic date that matches the last published date of our test course - test_timestamp, - # Course not dumped to ClickHouse yet - None, - ] - - call_command( - 'dump_courses_to_clickhouse' - ) - - assert mock_modulestore.call_count == 1 - assert mock_last_dump_time.call_count == 3 - assert "Course has been published since last dump time" in caplog.text - assert "Course has NOT been published since last dump time" in caplog.text - assert "Course is not present in ClickHouse" in caplog.text - assert "2 courses submitted for export to ClickHouse. 1 courses skipped." in caplog.text diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index db124a6..9784bce 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -3,12 +3,12 @@ """ from collections import namedtuple -from datetime import datetime, timedelta -from unittest.mock import patch +from datetime import datetime import django.core.management.base import pytest from django.core.management import call_command +from django_mock_queries.query import MockModel, MockSet from event_sink_clickhouse.sinks.base_sink import ModelBaseSink @@ -31,6 +31,10 @@ def __init__(self, id): self.id = id self.created = datetime.now() + @property + def pk(self): + return self.id + return DummyModel @@ -44,11 +48,15 @@ class DummySerializer: Dummy serializer for testing. """ - def __init__(self, model): + def __init__(self, model, many=False, initial=None): self.model = model + self.many = many + self.initial = initial @property def data(self): + if self.many: + return [{"id": item, "created": datetime.now()} for item in self.model] return {"id": self.model.id, "created": self.model.created} return DummySerializer @@ -65,18 +73,28 @@ class DummySink(ModelBaseSink): serializer_class = dummy_serializer_factory() timestamp_field = "created" clickhouse_table_name = "dummy_table" + factory = dummy_model_factory() + + def get_queryset(self, start_pk=None): + qs = MockSet( + MockModel(mock_name="john", email="john@test.invalid", pk=1), + MockModel(mock_name="jeff", email="jeff@test.invalid", pk=2), + MockModel(mock_name="bill", email="bill@test.invalid", pk=3), + MockModel(mock_name="joe", email="joe@test.invalid", pk=4), + MockModel(mock_name="jim", email="jim@test.invalid", pk=5), + ) + if start_pk: + qs = qs.filter(pk__gt=start_pk) + return qs - def get_queryset(self): - return [dummy_model_factory()(id) for id in range(1, 5)] + def should_dump_item(self, unique_key): + return unique_key.pk != 1, "No reason" - def convert_id(self, item_id): - return int(item_id) + def send_item_and_log(self, item_id, serialized_item, many): + pass - def should_dump_item(self, unique_key): - if unique_key % 2 == 0: - return True, "Even number" - else: - return False, "Odd number" + def get_object(self, item_id): + return self.factory(item_id) def dump_command_basic_options(): @@ -85,28 +103,76 @@ def dump_command_basic_options(): """ options = [ CommandOptions( - options={"object": "dummy", "ids_to_skip": ["1", "2", "3", "4"]}, - expected_num_submitted=0, + options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, + expected_num_submitted=4, expected_logs=[ - "submitted for export to ClickHouse", + "Dumped 4 objects to ClickHouse", ], ), CommandOptions( - options={"object": "dummy", "limit": 1}, + options={"object": "dummy", "limit": 1, "batch_size": 1, "sleep_time": 0}, expected_num_submitted=1, expected_logs=["Limit of 1 eligible objects has been reached, quitting!"], ), CommandOptions( - options={"object": "dummy", "ids": ["1", "2", "3", "4"]}, + options={"object": "dummy", "batch_size": 2, "sleep_time": 0}, expected_num_submitted=2, expected_logs=[ - "These objects were submitted for dump to ClickHouse successfully", + "Now dumping 2 Dummy to ClickHouse", + ], + ), + CommandOptions( + options={ + "object": "dummy", + "batch_size": 1, + "sleep_time": 0, + "ids": ["1", "2", "3"], + }, + expected_num_submitted=3, + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 2 objects to ClickHouse", + "Last ID: 3" ], ), CommandOptions( - options={"object": "dummy", "force": True}, + options={ + "object": "dummy", + "batch_size": 1, + "sleep_time": 0, + "start_pk": 1, + }, expected_num_submitted=4, - expected_logs=["Force is set"], + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 4 objects to ClickHouse", + ], + ), + CommandOptions( + options={ + "object": "dummy", + "batch_size": 1, + "sleep_time": 0, + "force": True, + }, + expected_num_submitted=4, + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 5 objects to ClickHouse", + ], + ), + CommandOptions( + options={ + "object": "dummy", + "batch_size": 2, + "sleep_time": 0, + "ids_to_skip": ["3", "4", "5"], + }, + expected_num_submitted=4, + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 1 objects to ClickHouse", + ], ), ] @@ -115,17 +181,13 @@ def dump_command_basic_options(): @pytest.mark.parametrize("test_command_option", dump_command_basic_options()) -@patch( - "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" -) -def test_dump_courses_options(mock_dump_data, test_command_option, caplog): +def test_dump_courses_options(test_command_option, caplog): option_combination, expected_num_submitted, expected_outputs = test_command_option assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] call_command("dump_data_to_clickhouse", **option_combination) - assert mock_dump_data.apply_async.call_count == expected_num_submitted for expected_output in expected_outputs: assert expected_output in caplog.text @@ -162,10 +224,7 @@ def dump_basic_invalid_options(): @pytest.mark.parametrize("test_command_option", dump_basic_invalid_options()) -@patch( - "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" -) -def test_dump_courses_options_invalid(mock_dump_data, test_command_option, caplog): +def test_dump_courses_options_invalid(test_command_option, caplog): option_combination, expected_num_submitted, expected_outputs = test_command_option assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] diff --git a/tests/test_base_sink.py b/tests/test_base_sink.py index a215225..8be5345 100644 --- a/tests/test_base_sink.py +++ b/tests/test_base_sink.py @@ -25,6 +25,39 @@ class ChildSink(ModelBaseSink): # pylint: disable=abstract-method serializer_class = Mock() +@override_settings( + EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG={ + "url": "http://clickhouse:8123", + "username": "ch_cms", + "password": "password", + "database": "event_sink", + "timeout_secs": 5, + }, + EVENT_SINK_CLICKHOUSE_MODEL_CONFIG={}, +) +class TestBaseSink(TestCase): + """ + Tests for the BaseSink. + """ + + def test_connection_overrides(self): + """ + Test that connection_overrides() returns the correct data. + """ + child_sink = ChildSink(connection_overrides={ + "url": "http://dummy:8123", + "username": "dummy_username", + "password": "dummy_password", + "database": "dummy_database", + "timeout_secs": 0, + }, log=logging.getLogger()) + + self.assertEqual(child_sink.ch_url, "http://dummy:8123") + self.assertEqual(child_sink.ch_auth, ("dummy_username", "dummy_password")) + self.assertEqual(child_sink.ch_database, "dummy_database") + self.assertEqual(child_sink.ch_timeout_secs, 0) + + @override_settings( EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG={ # URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or @@ -32,7 +65,7 @@ class ChildSink(ModelBaseSink): # pylint: disable=abstract-method # to avoid pulling in more dependencies to the platform than necessary. "url": "http://clickhouse:8123", "username": "ch_cms", - "password": "TYreGozgtDG3vkoWPUHVVM6q", + "password": "password", "database": "event_sink", "timeout_secs": 5, }, @@ -194,6 +227,16 @@ def test_get_queryset(self): self.child_sink.get_queryset() self.child_sink.get_model.return_value.objects.all.assert_called_once() + def test_get_queryset_by_start_pk(self): + """ + Test that get_queryset() returns a query set. + """ + self.child_sink.get_model = Mock() + self.child_sink.get_queryset(start_pk=1) + self.child_sink.get_model.return_value.objects.filter.assert_called_once_with( + pk__gt=1 + ) + def test_nested_sink_dump_related(self): """ Test that dump_related() calls the correct methods. @@ -209,12 +252,6 @@ def test_get_serializer(self): serializer = self.child_sink.get_serializer() self.assertEqual(serializer, self.child_sink.serializer_class) - def test_convert_id(self): - """ - Test that convert_id() returns the correct data. - """ - self.assertEqual(self.child_sink.convert_id(1), 1) - def test_should_dump_item(self): """ Test that should_dump_item() returns the correct data. @@ -243,3 +280,13 @@ def test_is_enabled(self): Test that is_enable() returns the correct data. """ self.assertEqual(self.child_sink.is_enabled(), True) + + def test_get_sink_by_model_name(self): + """ + Test that get_sink_by_model_name() returns the correct data. + """ + no_sink = ModelBaseSink.get_sink_by_model_name("non_existent_model") + child_sink = ModelBaseSink.get_sink_by_model_name("child_model") + + self.assertIsNone(no_sink) + self.assertEqual(child_sink, ChildSink) diff --git a/tests/test_course_published.py b/tests/test_course_published.py index 4428a40..53567b6 100644 --- a/tests/test_course_published.py +++ b/tests/test_course_published.py @@ -23,7 +23,6 @@ fake_course_overview_factory, fake_serialize_fake_course_overview, get_clickhouse_http_params, - mock_course_overview, mock_detached_xblock_types, ) @@ -114,40 +113,28 @@ def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_o assert f"Error trying to dump Course Overview {course} to ClickHouse!" in caplog.text -@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") -def test_get_course_last_published(mock_overview): +def test_get_course_last_published(): """ Make sure we get a valid date back from this in the expected format. """ # Create a fake course overview, which will return a datetime object - course = mock_course_overview() - mock_overview.return_value = course - - # Request our course last published date - course_key = course_str_factory() + course_overview = fake_course_overview_factory(modified=datetime.now()) # Confirm that the string date we get back is a valid date - last_published_date = CourseOverviewSink(None, None).get_course_last_published(course_key) + last_published_date = CourseOverviewSink(None, None).get_course_last_published(course_overview) dt = datetime.strptime(last_published_date, "%Y-%m-%d %H:%M:%S.%f") assert dt @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter -@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") -def test_no_last_published_date(mock_overview): +def test_no_last_published_date(): """ Test that we get a None value back for courses that don't have a modified date. In some cases there is not modified date on a course. In coursegraph we skipped these if they are already in the database, so we're continuing this trend here. """ - # Fake a course with no modified date - course = mock_course_overview() - mock_overview.return_value = course - mock_overview.return_value.get_from_id.return_value = fake_course_overview_factory(modified=None) - - # Request our course last published date - course_key = course_str_factory() + course_overview = fake_course_overview_factory(modified=None) # should_dump_course will reach out to ClickHouse for the last dump date # we'll fake the response here to have any date, such that we'll exercise @@ -159,12 +146,72 @@ def test_no_last_published_date(mock_overview): # Confirm that the string date we get back is a valid date sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) - should_dump_course, reason = sink.should_dump_item(course_key) + should_dump_course, reason = sink.should_dump_item(course_overview) assert should_dump_course is False assert reason == "No last modified date in CourseOverview" +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_should_dump_item(): + """ + Test that we get the expected results from should_dump_item. + """ + course_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) + + # should_dump_course will reach out to ClickHouse for the last dump date + # we'll fake the response here to have any date, such that we'll exercise + # all the "no modified date" code. + responses.get( + "https://foo.bar/", + body="2023-05-03 15:47:39.331024+00:00" + ) + + # Confirm that the string date we get back is a valid date + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_overview) + + assert should_dump_course is True + assert "Course has been published since last dump time - " in reason + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_should_dump_item_not_in_clickhouse(): + """ + Test that a course gets dumped if it's never been dumped before + """ + course_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) + responses.get( + "https://foo.bar/", + body="" + ) + + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_overview) + + assert should_dump_course is True + assert "Course is not present in ClickHouse" == reason + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_should_dump_item_no_needs_dump(): + """ + Test that a course gets dumped if it's never been dumped before + """ + modified = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") + course_overview = fake_course_overview_factory(modified=modified) + responses.get( + "https://foo.bar/", + body=modified + ) + + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_overview) + + assert should_dump_course is False + assert "Course has NOT been published since last dump time - " in reason + + @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter def test_course_not_present_in_clickhouse(): """ diff --git a/tests/test_external_id_sink.py b/tests/test_external_id_sink.py new file mode 100644 index 0000000..f0aead0 --- /dev/null +++ b/tests/test_external_id_sink.py @@ -0,0 +1,20 @@ +""" +Test the external_id_sink module. +""" + +from unittest.mock import patch + +from event_sink_clickhouse.sinks.external_id_sink import ExternalIdSink + + +@patch("event_sink_clickhouse.sinks.external_id_sink.ModelBaseSink.get_queryset") +def test_get_queryset(mock_get_queryset): + """ + Test the get_queryset method. + """ + sink = ExternalIdSink(None, None) + + sink.get_queryset() + + mock_get_queryset.assert_called_once_with(None) + mock_get_queryset.return_value.select_related.assert_called_once_with("user", "external_id_type") diff --git a/tests/test_user_profile_sink.py b/tests/test_user_profile_sink.py new file mode 100644 index 0000000..abd99d0 --- /dev/null +++ b/tests/test_user_profile_sink.py @@ -0,0 +1,20 @@ +""" +Test the external_id_sink module. +""" + +from unittest.mock import patch + +from event_sink_clickhouse.sinks.user_profile_sink import UserProfileSink + + +@patch("event_sink_clickhouse.sinks.external_id_sink.ModelBaseSink.get_queryset") +def test_get_queryset(mock_get_queryset): + """ + Test the get_queryset method. + """ + sink = UserProfileSink(None, None) + + sink.get_queryset() + + mock_get_queryset.assert_called_once_with(None) + mock_get_queryset.return_value.select_related.assert_called_once_with("user")