From c36b970cf8951d9c0ccfd1bd55c07207cdc8a1eb Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 6 Feb 2024 10:52:55 -0500 Subject: [PATCH 1/9] feat: backfill data to clickhouse in batches and same thread --- .../commands/dump_data_to_clickhouse.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index 25605c7..3ee52a1 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -26,12 +26,12 @@ def dump_target_objects_to_clickhouse( - connection_overrides=None, sink=None, - object_ids=None, - objects_to_skip=None, - force=None, + object_ids=[], + objects_to_skip=[], + force=False, limit=None, + batch_size=1000, ): """ Iterates through a list of objects in the ORN, serializes them to csv, @@ -49,6 +49,7 @@ def dump_target_objects_to_clickhouse( skipped_objects = [] index = 0 + objects_to_submit = [] for object_id, should_be_dumped, reason in sink.fetch_target_items( object_ids, objects_to_skip, force ): @@ -64,15 +65,12 @@ def dump_target_objects_to_clickhouse( log.info( f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'." ) + objects_to_submit.append(object_id) - dump_data_to_clickhouse.apply_async( - kwargs={ - "sink_module": sink.__module__, - "sink_name": sink.__class__.__name__, - "object_id": str(object_id), - "connection_overrides": connection_overrides, - } - ) + if len(objects_to_submit) % batch_size == 0: + sink.dump(objects_to_submit, many=True) + submitted_objects.extend(objects_to_submit) + objects_to_submit = [] submitted_objects.append(str(object_id)) @@ -147,6 +145,12 @@ def add_arguments(self, parser): type=int, help="maximum number of objects to dump, cannot be used with '--ids' or '--force'", ) + parser.add_argument( + "--batch_size", + type=int, + default=1000, + help="number of objects to dump in a single batch", + ) def handle(self, *args, **options): """ @@ -183,12 +187,12 @@ def handle(self, *args, **options): if cls.model == options["object"]: sink = cls(connection_overrides, log) submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( - connection_overrides, sink, [object_id.strip() for object_id in ids], [object_id.strip() for object_id in ids_to_skip], options["force"], options["limit"], + options["batch_size"], ) log.info( From e0c16b2109e4eb34ada193ff83fe419fc81f6f15 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 6 Feb 2024 11:14:01 -0500 Subject: [PATCH 2/9] chore: add sleep time argument for backfill --- .../commands/dump_data_to_clickhouse.py | 62 +++++++++++-------- event_sink_clickhouse/utils.py | 11 ++++ 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index 3ee52a1..8d6bd3d 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -15,12 +15,13 @@ python manage.py cms dump_objects_to_clickhouse --limit 1000 """ import logging +import time from textwrap import dedent from django.core.management.base import BaseCommand, CommandError from event_sink_clickhouse.sinks.base_sink import ModelBaseSink -from event_sink_clickhouse.tasks import dump_data_to_clickhouse +from event_sink_clickhouse.utils import get_sink_by_model log = logging.getLogger(__name__) @@ -32,6 +33,7 @@ def dump_target_objects_to_clickhouse( force=False, limit=None, batch_size=1000, + sleep_time=10, ): """ Iterates through a list of objects in the ORN, serializes them to csv, @@ -71,6 +73,7 @@ def dump_target_objects_to_clickhouse( sink.dump(objects_to_submit, many=True) submitted_objects.extend(objects_to_submit) objects_to_submit = [] + time.sleep(sleep_time) submitted_objects.append(str(object_id)) @@ -80,6 +83,9 @@ def dump_target_objects_to_clickhouse( ) break + if objects_to_submit: + sink.dump(objects_to_submit, many=True) + return submitted_objects, skipped_objects @@ -151,6 +157,12 @@ def add_arguments(self, parser): default=1000, help="number of objects to dump in a single batch", ) + parser.add_argument( + "--sleep_time", + type=int, + default=10, + help="number of seconds to sleep between batches", + ) def handle(self, *args, **options): """ @@ -183,29 +195,29 @@ def handle(self, *args, **options): log.error(message) raise CommandError(message) - for cls in ModelBaseSink.__subclasses__(): # pragma: no cover - if cls.model == options["object"]: - sink = cls(connection_overrides, log) - submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( - sink, - [object_id.strip() for object_id in ids], - [object_id.strip() for object_id in ids_to_skip], - options["force"], - options["limit"], - options["batch_size"], - ) - log.info( - "%d objects submitted for export to ClickHouse. %d objects skipped.", - len(submitted_objects), - len(skipped_objects), - ) + Sink = get_sink_by_model(options["object"]) + sink = Sink(connection_overrides, log) + submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( + sink, + [object_id.strip() for object_id in ids], + [object_id.strip() for object_id in ids_to_skip], + options["force"], + options["limit"], + options["batch_size"], + ) + + log.info( + "%d objects submitted for export to ClickHouse. %d objects skipped.", + len(submitted_objects), + len(skipped_objects), + ) + + if not submitted_objects: + log.info("No objects submitted for export to ClickHouse at all!") + else: + log.info( # pylint: disable=logging-not-lazy + "These objects were submitted for dump to ClickHouse successfully:\n\t" + + "\n\t".join(submitted_objects) + ) - if not submitted_objects: - log.info("No objects submitted for export to ClickHouse at all!") - else: - log.info( # pylint: disable=logging-not-lazy - "These objects were submitted for dump to ClickHouse successfully:\n\t" - + "\n\t".join(submitted_objects) - ) - break diff --git a/event_sink_clickhouse/utils.py b/event_sink_clickhouse/utils.py index 28745c3..6bc10a0 100644 --- a/event_sink_clickhouse/utils.py +++ b/event_sink_clickhouse/utils.py @@ -57,3 +57,14 @@ def get_detached_xblock_types(): # pragma: no cover from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES return DETACHED_XBLOCK_TYPES + + +def get_sink_by_model(model): + """Get a sink by model.""" + from event_sink_clickhouse.sinks.base_sink import ModelBaseSink + + for sink in ModelBaseSink.__subclasses__(): + if sink.model == model: + return sink + + return None From b540bcd2c8da8793816199600f50655c8e753d82 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 6 Feb 2024 14:31:39 -0500 Subject: [PATCH 3/9] chore: use paginator for querysets fix: query course overviews with django model fix: remove submitted_objects objects chore: quality fixes chore: remove old dump courses to clickhouse command fix: fix initial page, tests, and quality chore: handle pr comments test: improve coverage fix: paginator last page is included test: fixing test --- .../commands/dump_courses_to_clickhouse.py | 205 --------------- .../commands/dump_data_to_clickhouse.py | 74 +++--- event_sink_clickhouse/settings/common.py | 2 +- event_sink_clickhouse/sinks/base_sink.py | 44 +++- .../sinks/course_published.py | 4 - .../sinks/external_id_sink.py | 3 + .../sinks/user_profile_sink.py | 3 + event_sink_clickhouse/utils.py | 11 - tests/commands/test_dump_courses_command.py | 236 ------------------ .../commands/test_dump_data_to_clickhouse.py | 62 +++-- tests/test_base_sink.py | 45 +++- 11 files changed, 141 insertions(+), 548 deletions(-) delete mode 100644 event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py delete mode 100644 tests/commands/test_dump_courses_command.py diff --git a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py deleted file mode 100644 index dd87a84..0000000 --- a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -Management command for exporting the modulestore ClickHouse. - -Example usages (see usage for more options): - - # Dump all courses published since last dump. - # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`: - python manage.py cms dump_courses_to_clickhouse - - # Dump all courses published since last dump. - # Use custom connection parameters to send to a different ClickHouse instance: - python manage.py cms dump_courses_to_clickhouse --host localhost \ - --user user --password password --database research_db -- - - # Specify certain courses instead of dumping all of them. - # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. - python manage.py cms dump_courses_to_clickhouse --courses 'course-v1:A+B+1' 'course-v1:A+B+2' - - # Dump a limited number of courses to prevent stress on production systems - python manage.py cms dump_courses_to_clickhouse --limit 1000 -""" -import logging -from textwrap import dedent - -from django.core.management.base import BaseCommand, CommandError -from edx_django_utils.cache import RequestCache - -from event_sink_clickhouse.sinks.course_published import CourseOverviewSink -from event_sink_clickhouse.tasks import dump_course_to_clickhouse - -log = logging.getLogger(__name__) - - -def dump_target_courses_to_clickhouse( - connection_overrides=None, - course_keys=None, - courses_to_skip=None, - force=None, - limit=None, -): - """ - Iterates through a list of courses in a modulestore, serializes them to csv, - then submits tasks to post them to ClickHouse. - - Arguments: - force: serialize the courses even if they've been recently - serialized - - Returns: two lists--one of the courses that had dump jobs queued for them - and one of courses that did not. - """ - sink = CourseOverviewSink(connection_overrides, log) - - submitted_courses = [] - skipped_courses = [] - - index = 0 - for course_key, should_be_dumped, reason in sink.fetch_target_items( - course_keys, courses_to_skip, force - ): - log.info(f"Iteration {index}: {course_key}") - index += 1 - - if not should_be_dumped: - skipped_courses.append(course_key) - log.info( - f"Course {index}: Skipping course {course_key}, reason: '{reason}'" - ) - else: - # RequestCache is a local memory cache used in modulestore for performance reasons. - # Normally it is cleared at the end of every request, but in this command it will - # continue to grow until the command is done. To prevent excessive memory consumption - # we clear it every time we dump a course. - RequestCache.clear_all_namespaces() - - log.info( - f"Course {index}: Submitting {course_key} for dump to ClickHouse, reason '{reason}'." - ) - - dump_course_to_clickhouse.apply_async( - kwargs={ - "course_key_string": str(course_key), - "connection_overrides": connection_overrides, - } - ) - - submitted_courses.append(str(course_key)) - - if limit and len(submitted_courses) == limit: - log.info( - f"Limit of {limit} eligible course has been reached, quitting!" - ) - break - - return submitted_courses, skipped_courses - - -class Command(BaseCommand): - """ - Dump course block data to a ClickHouse instance. - """ - - help = dedent(__doc__).strip() - - def add_arguments(self, parser): - parser.add_argument( - "--url", - type=str, - help="the URL of the ClickHouse server", - ) - parser.add_argument( - "--username", - type=str, - help="the username of the ClickHouse user", - ) - parser.add_argument( - "--password", - type=str, - help="the password of the ClickHouse user", - ) - parser.add_argument( - "--database", - type=str, - help="the database in ClickHouse to connect to", - ) - parser.add_argument( - "--timeout_secs", - type=int, - help="timeout for ClickHouse requests, in seconds", - ) - parser.add_argument( - "--courses", - metavar="KEY", - type=str, - nargs="*", - help="keys of courses to serialize; if omitted all courses in system are serialized", - ) - parser.add_argument( - "--courses_to_skip", - metavar="KEY", - type=str, - nargs="*", - help="keys of courses to NOT to serialize", - ) - parser.add_argument( - "--force", - action="store_true", - help="dump all courses regardless of when they were last published", - ) - parser.add_argument( - "--limit", - type=int, - help="maximum number of courses to dump, cannot be used with '--courses' or '--force'", - ) - - def handle(self, *args, **options): - """ - Iterates through each course, serializes them into graphs, and saves - those graphs to clickhouse. - """ - connection_overrides = { - key: options[key] - for key in ["url", "username", "password", "database", "timeout_secs"] - if options[key] - } - - courses = options["courses"] if options["courses"] else [] - courses_to_skip = ( - options["courses_to_skip"] if options["courses_to_skip"] else [] - ) - - if options["limit"] is not None and int(options["limit"]) < 1: - message = "'limit' must be greater than 0!" - log.error(message) - raise CommandError(message) - - if options["limit"] and options["force"]: - message = ( - "The 'limit' option cannot be used with 'force' as running the " - "command repeatedly will result in the same courses being dumped every time." - ) - log.error(message) - raise CommandError(message) - - submitted_courses, skipped_courses = dump_target_courses_to_clickhouse( - connection_overrides, - [course_key.strip() for course_key in courses], - [course_key.strip() for course_key in courses_to_skip], - options["force"], - options["limit"], - ) - - log.info( - "%d courses submitted for export to ClickHouse. %d courses skipped.", - len(submitted_courses), - len(skipped_courses), - ) - - if not submitted_courses: - log.info("No courses submitted for export to ClickHouse at all!") - else: - log.info( # pylint: disable=logging-not-lazy - "These courses were submitted for dump to ClickHouse successfully:\n\t" - + "\n\t".join(submitted_courses) - ) diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index 8d6bd3d..a501c58 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -14,6 +14,7 @@ # Dump a limited number of objects to prevent stress on production systems python manage.py cms dump_objects_to_clickhouse --limit 1000 """ + import logging import time from textwrap import dedent @@ -21,15 +22,15 @@ from django.core.management.base import BaseCommand, CommandError from event_sink_clickhouse.sinks.base_sink import ModelBaseSink -from event_sink_clickhouse.utils import get_sink_by_model log = logging.getLogger(__name__) def dump_target_objects_to_clickhouse( sink=None, - object_ids=[], - objects_to_skip=[], + start_pk=None, + object_ids=None, + objects_to_skip=None, force=False, limit=None, batch_size=1000, @@ -47,37 +48,26 @@ def dump_target_objects_to_clickhouse( and one of objects that did not. """ - submitted_objects = [] + count = 0 skipped_objects = [] - - index = 0 objects_to_submit = [] - for object_id, should_be_dumped, reason in sink.fetch_target_items( - object_ids, objects_to_skip, force - ): - log.info(f"Iteration {index}: {object_id}") - index += 1 + for obj, should_be_dumped, reason in sink.fetch_target_items( + start_pk, object_ids, objects_to_skip, force, batch_size + ): if not should_be_dumped: - skipped_objects.append(object_id) - log.info( - f"{sink.model} {index}: Skipping object {object_id}, reason: '{reason}'" - ) + skipped_objects.append(obj.pk) + log.info(f"{sink.model}: Skipping object {obj.pk}, reason: '{reason}'") else: - log.info( - f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'." - ) - objects_to_submit.append(object_id) - + objects_to_submit.append(obj) if len(objects_to_submit) % batch_size == 0: + count += len(objects_to_submit) sink.dump(objects_to_submit, many=True) - submitted_objects.extend(objects_to_submit) objects_to_submit = [] + log.info(f"Last ID: {obj.pk}") time.sleep(sleep_time) - submitted_objects.append(str(object_id)) - - if limit and len(submitted_objects) == limit: + if limit and count == limit: log.info( f"Limit of {limit} eligible objects has been reached, quitting!" ) @@ -85,8 +75,10 @@ def dump_target_objects_to_clickhouse( if objects_to_submit: sink.dump(objects_to_submit, many=True) + count += len(objects_to_submit) + log.info(f"Last ID: {objects_to_submit[-1].pk}") - return submitted_objects, skipped_objects + log.info(f"Dumped {count} objects to ClickHouse") class Command(BaseCommand): @@ -127,6 +119,12 @@ def add_arguments(self, parser): type=str, help="the type of object to dump", ) + parser.add_argument( + "--start_pk", + type=int, + help="the primary key to start at", + default=None, + ) parser.add_argument( "--ids", metavar="KEY", @@ -154,13 +152,13 @@ def add_arguments(self, parser): parser.add_argument( "--batch_size", type=int, - default=1000, + default=10000, help="number of objects to dump in a single batch", ) parser.add_argument( "--sleep_time", type=int, - default=10, + default=1, help="number of seconds to sleep between batches", ) @@ -195,29 +193,15 @@ def handle(self, *args, **options): log.error(message) raise CommandError(message) - - Sink = get_sink_by_model(options["object"]) + Sink = ModelBaseSink.get_sink_by_model_name(options["object"]) sink = Sink(connection_overrides, log) - submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( + dump_target_objects_to_clickhouse( sink, + options["start_pk"], [object_id.strip() for object_id in ids], [object_id.strip() for object_id in ids_to_skip], options["force"], options["limit"], options["batch_size"], + options["sleep_time"], ) - - log.info( - "%d objects submitted for export to ClickHouse. %d objects skipped.", - len(submitted_objects), - len(skipped_objects), - ) - - if not submitted_objects: - log.info("No objects submitted for export to ClickHouse at all!") - else: - log.info( # pylint: disable=logging-not-lazy - "These objects were submitted for dump to ClickHouse successfully:\n\t" - + "\n\t".join(submitted_objects) - ) - diff --git a/event_sink_clickhouse/settings/common.py b/event_sink_clickhouse/settings/common.py index 73dae80..2ff6818 100644 --- a/event_sink_clickhouse/settings/common.py +++ b/event_sink_clickhouse/settings/common.py @@ -13,7 +13,7 @@ def plugin_settings(settings): # to avoid pulling in more dependencies to the platform than necessary. "url": "http://clickhouse:8123", "username": "ch_cms", - "password": "TYreGozgtDG3vkoWPUHVVM6q", + "password": "password", "database": "event_sink", "timeout_secs": 5, } diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 4a68745..ebe22cd 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -8,6 +8,7 @@ import requests from django.conf import settings +from django.core.paginator import Paginator from edx_toggles.toggles import WaffleFlag from event_sink_clickhouse.utils import get_model @@ -151,11 +152,14 @@ def get_model(self): """ return get_model(self.model) - def get_queryset(self): + def get_queryset(self, start_pk=None): """ Return the queryset to be used for the insert """ - return self.get_model().objects.all() + if start_pk: + return self.get_model().objects.filter(pk__gt=start_pk).order_by("pk") + else: + return self.get_model().objects.all().order_by("pk") def dump(self, item_id, many=False, initial=None): """ @@ -272,27 +276,30 @@ def send_item(self, serialized_item, many=False): self._send_clickhouse_request(request) - def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False): + def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump=False, batch_size=None): """ Fetch the items that should be dumped to ClickHouse """ if ids: item_keys = [self.convert_id(item_id) for item_id in ids] else: - item_keys = [item.id for item in self.get_queryset()] + item_keys = self.get_queryset(start_pk) skip_ids = ( [str(item_id) for item_id in skip_ids] if skip_ids else [] ) - - for item_key in item_keys: - if str(item_key) in skip_ids: - yield item_key, False, f"{self.name} is explicitly skipped" - elif force_dump: - yield item_key, True, "Force is set" - else: - should_be_dumped, reason = self.should_dump_item(item_key) - yield item_key, should_be_dumped, reason + paginator = Paginator(item_keys, batch_size) + for i in range(1, paginator.num_pages+1): + page = paginator.page(i) + items = page.object_list + for item_key in items: + if str(item_key) in skip_ids: + yield item_key, False, f"{self.name} is explicitly skipped" + elif force_dump: + yield item_key, True, "Force is set" + else: + should_be_dumped, reason = self.should_dump_item(item_key) + yield item_key, should_be_dumped, reason def convert_id(self, item_id): """ @@ -351,3 +358,14 @@ def is_enabled(cls): ) return enabled or waffle_flag.is_enabled() + + @classmethod + def get_sink_by_model_name(cls, model): + """ + Return the sink instance for the given model + """ + for sink in cls.__subclasses__(): + if sink.model == model: + return sink + + return None diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index 6fd0569..d99e003 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -219,7 +219,3 @@ def get_course_last_published(self, course_key): def convert_id(self, item_id): return CourseKey.from_string(item_id) - - def get_queryset(self): - modulestore = get_modulestore() - return modulestore.get_course_summaries() diff --git a/event_sink_clickhouse/sinks/external_id_sink.py b/event_sink_clickhouse/sinks/external_id_sink.py index 3866c34..6a03bf7 100644 --- a/event_sink_clickhouse/sinks/external_id_sink.py +++ b/event_sink_clickhouse/sinks/external_id_sink.py @@ -14,3 +14,6 @@ class ExternalIdSink(ModelBaseSink): # pylint: disable=abstract-method timestamp_field = "time_last_dumped" name = "External ID" serializer_class = UserExternalIDSerializer + + def get_queryset(self, start_pk=None): + return super().get_queryset(start_pk).select_related("user", "external_id_type") diff --git a/event_sink_clickhouse/sinks/user_profile_sink.py b/event_sink_clickhouse/sinks/user_profile_sink.py index ff652f6..e8bf8f2 100644 --- a/event_sink_clickhouse/sinks/user_profile_sink.py +++ b/event_sink_clickhouse/sinks/user_profile_sink.py @@ -14,3 +14,6 @@ class UserProfileSink(ModelBaseSink): # pylint: disable=abstract-method timestamp_field = "time_last_dumped" name = "User Profile" serializer_class = UserProfileSerializer + + def get_queryset(self, start_pk=None): + return super().get_queryset(start_pk).select_related("user") diff --git a/event_sink_clickhouse/utils.py b/event_sink_clickhouse/utils.py index 6bc10a0..28745c3 100644 --- a/event_sink_clickhouse/utils.py +++ b/event_sink_clickhouse/utils.py @@ -57,14 +57,3 @@ def get_detached_xblock_types(): # pragma: no cover from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES return DETACHED_XBLOCK_TYPES - - -def get_sink_by_model(model): - """Get a sink by model.""" - from event_sink_clickhouse.sinks.base_sink import ModelBaseSink - - for sink in ModelBaseSink.__subclasses__(): - if sink.model == model: - return sink - - return None diff --git a/tests/commands/test_dump_courses_command.py b/tests/commands/test_dump_courses_command.py deleted file mode 100644 index 56946ba..0000000 --- a/tests/commands/test_dump_courses_command.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Tests for the dump_courses_to_clickhouse management command. -""" -from collections import namedtuple -from datetime import datetime, timedelta -from unittest.mock import patch - -import django.core.management.base -import pytest -from django.core.management import call_command - -from test_utils.helpers import FakeCourse, course_str_factory, fake_course_overview_factory - - -@pytest.fixture -def mock_common_calls(): - """ - Mock out calls that we test elsewhere and aren't relevant to the command tests. - """ - command_path = "event_sink_clickhouse.management.commands.dump_courses_to_clickhouse" - with patch(command_path+".dump_course_to_clickhouse") as mock_dump_course: - with patch(command_path+".CourseOverviewSink.get_model") as mock_get_course_overview_model: - with patch("event_sink_clickhouse.sinks.course_published.get_modulestore") as mock_modulestore: - with patch(command_path+".CourseOverviewSink.get_last_dumped_timestamp") as mock_last_dump_time: - # Set a reasonable default last dump time a year in the past - mock_last_dump_time.return_value = \ - (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - yield mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time - - -def dump_command_clickhouse_options(): - """ - Pytest params for all the different ClickHouse options. - - Just making sure every option gets passed through correctly. - """ - options = [ - {}, - {'url': "https://foo/"}, - {'username': "Foo"}, - {'password': "F00"}, - {'database': "foo"}, - {'timeout_secs': 60}, - {'url': "https://foo/", 'username': "Foo", 'password': "F00", 'database': "foo", 'timeout_secs': 60}, - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("option_combination", dump_command_clickhouse_options()) -def test_dump_courses_to_clickhouse_db_options( - option_combination, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - course_id = course_str_factory() - - fake_modulestore_courses = [FakeCourse(course_id)] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - fake_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview - - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Make sure that our mocks were called as expected - assert mock_modulestore.call_count == 1 - assert mock_dump_course.apply_async.call_count == 1 - mock_dump_course.apply_async.assert_called_once_with(kwargs=dict( - course_key_string=course_id, - connection_overrides=option_combination - )) - assert "Course has been published since last dump time" in caplog.text - assert "These courses were submitted for dump to ClickHouse successfully" in caplog.text - - -CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"]) - - -def dump_command_basic_options(): - """ - Pytest params for all the different non-ClickHouse command options. - """ - options = [ - CommandOptions( - options={"courses_to_skip": [course_str_factory()]}, - expected_num_submitted=0, - expected_logs=[ - "0 courses submitted for export to ClickHouse. 1 courses skipped.", - "Course Overview is explicitly skipped" - ] - ), - CommandOptions( - options={"limit": 1}, - expected_num_submitted=1, - expected_logs=["Limit of 1 eligible course has been reached, quitting!"] - ), - CommandOptions( - options={"courses": [course_str_factory()]}, - expected_num_submitted=1, - expected_logs=[ - "Course has been published since last dump time", - "These courses were submitted for dump to ClickHouse successfully" - ] - ), - CommandOptions( - options={"force": True}, - expected_num_submitted=1, - expected_logs=["Force is set"] - ), - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("test_command_option", dump_command_basic_options()) -def test_dump_courses_options( - test_command_option, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - option_combination, expected_num_submitted, expected_outputs = test_command_option - course_id = course_str_factory() - - fake_modulestore_courses = [FakeCourse(course_id), ] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_course_overview_factory( - modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - ) - - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Make sure that our mocks were called as expected - if "courses" not in option_combination: - # Modulestore will only be called here if we're not passing in a list of courses - assert mock_modulestore.call_count == 1 - assert mock_dump_course.apply_async.call_count == expected_num_submitted - for expected_output in expected_outputs: - assert expected_output in caplog.text - - -def dump_command_invalid_options(): - """ - Pytest params for all the different non-ClickHouse command options. - """ - options = [ - CommandOptions( - options={"force": True, "limit": 100}, - expected_num_submitted=0, - expected_logs=[ - "The 'limit' option cannot be used with 'force'", - ] - ), - CommandOptions( - options={"limit": -1}, - expected_num_submitted=0, - expected_logs=["'limit' must be greater than 0!"] - ), - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("test_command_option", dump_command_invalid_options()) -def test_invalid_dump_command_options( - test_command_option, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - option_combination, expected_num_submitted, expected_outputs = test_command_option - - with pytest.raises(django.core.management.base.CommandError): - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Just to make sure we're not calling things more than we need to - assert mock_modulestore.call_count == 0 - assert mock_dump_course.apply_async.call_count == 0 - for expected_output in expected_outputs: - assert expected_output in caplog.text - - -def test_multiple_courses_different_times( - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - test_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - - course_id_1 = course_str_factory("course_1") - course_id_2 = course_str_factory("course_2") - course_id_3 = course_str_factory("course_3") - - fake_modulestore_courses = [FakeCourse(course_id_1), FakeCourse(course_id_2), FakeCourse(course_id_3)] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - - fake_overview = fake_course_overview_factory(modified=test_timestamp) - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview - - # Each time last_dump_time is called it will get a different date so we can test - # them all together - mock_last_dump_time.side_effect = [ - # One year ago - (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00"), - # A magic date that matches the last published date of our test course - test_timestamp, - # Course not dumped to ClickHouse yet - None, - ] - - call_command( - 'dump_courses_to_clickhouse' - ) - - assert mock_modulestore.call_count == 1 - assert mock_last_dump_time.call_count == 3 - assert "Course has been published since last dump time" in caplog.text - assert "Course has NOT been published since last dump time" in caplog.text - assert "Course is not present in ClickHouse" in caplog.text - assert "2 courses submitted for export to ClickHouse. 1 courses skipped." in caplog.text diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index db124a6..b7af184 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -31,6 +31,10 @@ def __init__(self, id): self.id = id self.created = datetime.now() + @property + def pk(self): + return self.id + return DummyModel @@ -44,11 +48,15 @@ class DummySerializer: Dummy serializer for testing. """ - def __init__(self, model): + def __init__(self, model, many=False, initial=None): self.model = model + self.many = many + self.initial = initial @property def data(self): + if self.many: + return [{"id": item, "created": datetime.now()} for item in self.model] return {"id": self.model.id, "created": self.model.created} return DummySerializer @@ -65,18 +73,19 @@ class DummySink(ModelBaseSink): serializer_class = dummy_serializer_factory() timestamp_field = "created" clickhouse_table_name = "dummy_table" + factory = dummy_model_factory() - def get_queryset(self): - return [dummy_model_factory()(id) for id in range(1, 5)] - - def convert_id(self, item_id): - return int(item_id) + def get_queryset(self, start_pk=None): + return [self.factory(id) for id in range(1, 5)] def should_dump_item(self, unique_key): - if unique_key % 2 == 0: - return True, "Even number" - else: - return False, "Odd number" + return True, "No reason" + + def send_item_and_log(self, item_id, serialized_item, many): + pass + + def get_object(self, item_id): + return self.factory(item_id) def dump_command_basic_options(): @@ -85,28 +94,24 @@ def dump_command_basic_options(): """ options = [ CommandOptions( - options={"object": "dummy", "ids_to_skip": ["1", "2", "3", "4"]}, - expected_num_submitted=0, - expected_logs=[ - "submitted for export to ClickHouse", - ], + options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, + expected_num_submitted=4, + expected_logs=["Dumped 4 objects to ClickHouse",], ), CommandOptions( - options={"object": "dummy", "limit": 1}, + options={"object": "dummy", "limit": 1, "batch_size": 1, "sleep_time": 0}, expected_num_submitted=1, expected_logs=["Limit of 1 eligible objects has been reached, quitting!"], ), CommandOptions( - options={"object": "dummy", "ids": ["1", "2", "3", "4"]}, + options={"object": "dummy", "batch_size": 2, "sleep_time": 0}, expected_num_submitted=2, - expected_logs=[ - "These objects were submitted for dump to ClickHouse successfully", - ], + expected_logs=["Now dumping 2 Dummy to ClickHouse",], ), CommandOptions( - options={"object": "dummy", "force": True}, - expected_num_submitted=4, - expected_logs=["Force is set"], + options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, + expected_num_submitted=3, + expected_logs=["Now dumping 1 Dummy to ClickHouse", "Dumped 4 objects to ClickHouse"], ), ] @@ -115,17 +120,13 @@ def dump_command_basic_options(): @pytest.mark.parametrize("test_command_option", dump_command_basic_options()) -@patch( - "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" -) -def test_dump_courses_options(mock_dump_data, test_command_option, caplog): +def test_dump_courses_options(test_command_option, caplog): option_combination, expected_num_submitted, expected_outputs = test_command_option assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] call_command("dump_data_to_clickhouse", **option_combination) - assert mock_dump_data.apply_async.call_count == expected_num_submitted for expected_output in expected_outputs: assert expected_output in caplog.text @@ -162,10 +163,7 @@ def dump_basic_invalid_options(): @pytest.mark.parametrize("test_command_option", dump_basic_invalid_options()) -@patch( - "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" -) -def test_dump_courses_options_invalid(mock_dump_data, test_command_option, caplog): +def test_dump_courses_options_invalid(test_command_option, caplog): option_combination, expected_num_submitted, expected_outputs = test_command_option assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] diff --git a/tests/test_base_sink.py b/tests/test_base_sink.py index a215225..9c39158 100644 --- a/tests/test_base_sink.py +++ b/tests/test_base_sink.py @@ -25,6 +25,39 @@ class ChildSink(ModelBaseSink): # pylint: disable=abstract-method serializer_class = Mock() +@override_settings( + EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG={ + "url": "http://clickhouse:8123", + "username": "ch_cms", + "password": "password", + "database": "event_sink", + "timeout_secs": 5, + }, + EVENT_SINK_CLICKHOUSE_MODEL_CONFIG={}, +) +class TestBaseSink(TestCase): + """ + Tests for the BaseSink. + """ + + def test_connection_overrides(self): + """ + Test that connection_overrides() returns the correct data. + """ + child_sink = ChildSink(connection_overrides={ + "url": "http://dummy:8123", + "username": "dummy_username", + "password": "dummy_password", + "database": "dummy_database", + "timeout_secs": 0, + }, log=logging.getLogger()) + + self.assertEqual(child_sink.ch_url, "http://dummy:8123") + self.assertEqual(child_sink.ch_auth, ("dummy_username", "dummy_password")) + self.assertEqual(child_sink.ch_database, "dummy_database") + self.assertEqual(child_sink.ch_timeout_secs, 0) + + @override_settings( EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG={ # URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or @@ -32,7 +65,7 @@ class ChildSink(ModelBaseSink): # pylint: disable=abstract-method # to avoid pulling in more dependencies to the platform than necessary. "url": "http://clickhouse:8123", "username": "ch_cms", - "password": "TYreGozgtDG3vkoWPUHVVM6q", + "password": "password", "database": "event_sink", "timeout_secs": 5, }, @@ -243,3 +276,13 @@ def test_is_enabled(self): Test that is_enable() returns the correct data. """ self.assertEqual(self.child_sink.is_enabled(), True) + + def test_get_sink_by_model_name(self): + """ + Test that get_sink_by_model_name() returns the correct data. + """ + no_sink = ModelBaseSink.get_sink_by_model_name("non_existent_model") + child_sink = ModelBaseSink.get_sink_by_model_name("child_model") + + self.assertIsNone(no_sink) + self.assertEqual(child_sink, ChildSink) From a658df604f22e0d4e2f8faf726a54fd35b0b427c Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 9 Feb 2024 16:59:50 -0500 Subject: [PATCH 4/9] fix: use queryset object to filter data --- event_sink_clickhouse/sinks/base_sink.py | 19 ++++++------ .../sinks/course_published.py | 4 +-- requirements/test.in | 1 + .../commands/test_dump_data_to_clickhouse.py | 30 +++++++++++++++---- tests/test_base_sink.py | 6 ---- 5 files changed, 35 insertions(+), 25 deletions(-) diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index ebe22cd..8ed9f6f 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -125,6 +125,10 @@ class ModelBaseSink(BaseSink): list: A list of nested sink instances that can be used to further process or route the event data. Nested sinks allow chaining multiple sinks together for more complex event processing pipelines. """ + pk_format = int + """ + function: A function to format the primary key of the model + """ def __init__(self, connection_overrides, log): super().__init__(connection_overrides, log) @@ -157,6 +161,7 @@ def get_queryset(self, start_pk=None): Return the queryset to be used for the insert """ if start_pk: + start_pk = self.pk_format(start_pk) return self.get_model().objects.filter(pk__gt=start_pk).order_by("pk") else: return self.get_model().objects.all().order_by("pk") @@ -280,15 +285,15 @@ def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump= """ Fetch the items that should be dumped to ClickHouse """ + queryset = self.get_queryset(start_pk) if ids: - item_keys = [self.convert_id(item_id) for item_id in ids] - else: - item_keys = self.get_queryset(start_pk) + ids = map(self.pk_format, ids) + queryset = queryset.filter(pk__in=ids) skip_ids = ( [str(item_id) for item_id in skip_ids] if skip_ids else [] ) - paginator = Paginator(item_keys, batch_size) + paginator = Paginator(queryset, batch_size) for i in range(1, paginator.num_pages+1): page = paginator.page(i) items = page.object_list @@ -301,12 +306,6 @@ def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump= should_be_dumped, reason = self.should_dump_item(item_key) yield item_key, should_be_dumped, reason - def convert_id(self, item_id): - """ - Convert the id to the correct type for the model - """ - return item_id - def should_dump_item(self, unique_key): # pylint: disable=unused-argument """ Return True if the item should be dumped to ClickHouse, False otherwise diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index d99e003..a488370 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -149,6 +149,7 @@ class CourseOverviewSink(ModelBaseSink): # pylint: disable=abstract-method name = "Course Overview" serializer_class = CourseOverviewSerializer nested_sinks = [XBlockSink] + pk_format = str def should_dump_item(self, unique_key): """ @@ -216,6 +217,3 @@ def get_course_last_published(self, course_key): return str(approx_last_published) return None - - def convert_id(self, item_id): - return CourseKey.from_string(item_id) diff --git a/requirements/test.in b/requirements/test.in index 7f7c862..1bc4cb8 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -8,3 +8,4 @@ pytest-django # pytest extension for better Django support code-annotations # provides commands used by the pii_check make target. responses # mocks for the requests library ddt +django-mock-queries diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index b7af184..2b6913f 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -3,8 +3,8 @@ """ from collections import namedtuple -from datetime import datetime, timedelta -from unittest.mock import patch +from datetime import datetime +from unittest.mock import Mock import django.core.management.base import pytest @@ -16,6 +16,10 @@ "TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"] ) +from django_mock_queries.query import MockSet, MockModel + + + def dummy_model_factory(): """ @@ -76,7 +80,16 @@ class DummySink(ModelBaseSink): factory = dummy_model_factory() def get_queryset(self, start_pk=None): - return [self.factory(id) for id in range(1, 5)] + qs = MockSet( + MockModel(mock_name='john', email='john@edx.com', pk=1), + MockModel(mock_name='jeff', email='jeff@edx.com', pk=2), + MockModel(mock_name='bill', email='bill@edx.com', pk=3), + MockModel(mock_name='joe', email='joe@edx.com', pk=4), + MockModel(mock_name='jim', email='jim@edx.com', pk=5), + ) + if start_pk: + qs = qs.filter(pk__gt=start_pk) + return qs def should_dump_item(self, unique_key): return True, "No reason" @@ -95,8 +108,8 @@ def dump_command_basic_options(): options = [ CommandOptions( options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, - expected_num_submitted=4, - expected_logs=["Dumped 4 objects to ClickHouse",], + expected_num_submitted=5, + expected_logs=["Dumped 5 objects to ClickHouse",], ), CommandOptions( options={"object": "dummy", "limit": 1, "batch_size": 1, "sleep_time": 0}, @@ -109,8 +122,13 @@ def dump_command_basic_options(): expected_logs=["Now dumping 2 Dummy to ClickHouse",], ), CommandOptions( - options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, + options={"object": "dummy", "batch_size": 1, "sleep_time": 0, "ids": ["1", "2", "3"]}, expected_num_submitted=3, + expected_logs=["Now dumping 1 Dummy to ClickHouse", "Dumped 3 objects to ClickHouse"], + ), + CommandOptions( + options={"object": "dummy", "batch_size": 1, "sleep_time": 0, "start_pk": 1}, + expected_num_submitted=4, expected_logs=["Now dumping 1 Dummy to ClickHouse", "Dumped 4 objects to ClickHouse"], ), ] diff --git a/tests/test_base_sink.py b/tests/test_base_sink.py index 9c39158..46ad934 100644 --- a/tests/test_base_sink.py +++ b/tests/test_base_sink.py @@ -242,12 +242,6 @@ def test_get_serializer(self): serializer = self.child_sink.get_serializer() self.assertEqual(serializer, self.child_sink.serializer_class) - def test_convert_id(self): - """ - Test that convert_id() returns the correct data. - """ - self.assertEqual(self.child_sink.convert_id(1), 1) - def test_should_dump_item(self): """ Test that should_dump_item() returns the correct data. From b8f33d32a7e40e47483effb465637e9f92fd9ef1 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 9 Feb 2024 17:17:17 -0500 Subject: [PATCH 5/9] chore: upgrade requirements --- requirements/dev.txt | 9 +++++++++ requirements/doc.txt | 9 +++++++++ requirements/quality.txt | 9 +++++++++ requirements/test.txt | 7 +++++++ 4 files changed, 34 insertions(+) diff --git a/requirements/dev.txt b/requirements/dev.txt index 27a40a4..88ac050 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -117,16 +117,20 @@ django==3.2.24 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/quality.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-i18n-tools # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/quality.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/quality.txt django-rest-framework==0.1.0 # via -r requirements/quality.txt django-waffle==4.1.0 @@ -137,6 +141,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/quality.txt + # django-mock-queries # django-rest-framework edx-django-utils==5.10.1 # via @@ -194,6 +199,10 @@ mccabe==0.7.0 # via # -r requirements/quality.txt # pylint +model-bakery==1.17.0 + # via + # -r requirements/quality.txt + # django-mock-queries mypy-extensions==1.0.0 # via black newrelic==9.6.0 diff --git a/requirements/doc.txt b/requirements/doc.txt index b165cc3..77c72f9 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -88,15 +88,19 @@ django==3.2.24 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/test.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/test.txt django-rest-framework==0.1.0 # via -r requirements/test.txt django-waffle==4.1.0 @@ -107,6 +111,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/test.txt + # django-mock-queries # django-rest-framework doc8==1.1.1 # via -r requirements/doc.in @@ -172,6 +177,10 @@ markupsafe==2.1.5 # jinja2 mdurl==0.1.2 # via markdown-it-py +model-bakery==1.17.0 + # via + # -r requirements/test.txt + # django-mock-queries more-itertools==10.2.0 # via jaraco-classes newrelic==9.6.0 diff --git a/requirements/quality.txt b/requirements/quality.txt index 5e925f3..b63cea9 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -84,15 +84,19 @@ django==3.2.24 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/test.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/test.txt django-rest-framework==0.1.0 # via -r requirements/test.txt django-waffle==4.1.0 @@ -103,6 +107,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/test.txt + # django-mock-queries # django-rest-framework edx-django-utils==5.10.1 # via @@ -144,6 +149,10 @@ markupsafe==2.1.5 # jinja2 mccabe==0.7.0 # via pylint +model-bakery==1.17.0 + # via + # -r requirements/test.txt + # django-mock-queries newrelic==9.6.0 # via # -r requirements/test.txt diff --git a/requirements/test.txt b/requirements/test.txt index 312b4d5..4510d3a 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -72,15 +72,19 @@ ddt==1.7.1 # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/base.txt # django-crum + # django-mock-queries # django-waffle # djangorestframework # edx-django-utils # edx-toggles + # model-bakery django-crum==0.7.9 # via # -r requirements/base.txt # edx-django-utils # edx-toggles +django-mock-queries==2.2.0 + # via -r requirements/test.in django-rest-framework==0.1.0 # via -r requirements/base.txt django-waffle==4.1.0 @@ -91,6 +95,7 @@ django-waffle==4.1.0 djangorestframework==3.14.0 # via # -r requirements/base.txt + # django-mock-queries # django-rest-framework edx-django-utils==5.10.1 # via @@ -120,6 +125,8 @@ markupsafe==2.1.5 # via # -r requirements/base.txt # jinja2 +model-bakery==1.17.0 + # via django-mock-queries newrelic==9.6.0 # via # -r requirements/base.txt From bcf5477ca3007c15839145cea6348e269b4b732a Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 12 Feb 2024 10:02:48 -0500 Subject: [PATCH 6/9] chore: quality fixes test: add test for get_queryset test: add test for get_queryset --- .../commands/test_dump_data_to_clickhouse.py | 78 ++++++++++++++----- tests/test_base_sink.py | 10 +++ 2 files changed, 70 insertions(+), 18 deletions(-) diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index 2b6913f..41e3c74 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -4,11 +4,11 @@ from collections import namedtuple from datetime import datetime -from unittest.mock import Mock import django.core.management.base import pytest from django.core.management import call_command +from django_mock_queries.query import MockModel, MockSet from event_sink_clickhouse.sinks.base_sink import ModelBaseSink @@ -16,10 +16,6 @@ "TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"] ) -from django_mock_queries.query import MockSet, MockModel - - - def dummy_model_factory(): """ @@ -81,18 +77,18 @@ class DummySink(ModelBaseSink): def get_queryset(self, start_pk=None): qs = MockSet( - MockModel(mock_name='john', email='john@edx.com', pk=1), - MockModel(mock_name='jeff', email='jeff@edx.com', pk=2), - MockModel(mock_name='bill', email='bill@edx.com', pk=3), - MockModel(mock_name='joe', email='joe@edx.com', pk=4), - MockModel(mock_name='jim', email='jim@edx.com', pk=5), + MockModel(mock_name="john", email="john@edx.com", pk=1), + MockModel(mock_name="jeff", email="jeff@edx.com", pk=2), + MockModel(mock_name="bill", email="bill@edx.com", pk=3), + MockModel(mock_name="joe", email="joe@edx.com", pk=4), + MockModel(mock_name="jim", email="jim@edx.com", pk=5), ) if start_pk: qs = qs.filter(pk__gt=start_pk) return qs def should_dump_item(self, unique_key): - return True, "No reason" + return unique_key.pk!=1, "No reason" def send_item_and_log(self, item_id, serialized_item, many): pass @@ -108,8 +104,10 @@ def dump_command_basic_options(): options = [ CommandOptions( options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, - expected_num_submitted=5, - expected_logs=["Dumped 5 objects to ClickHouse",], + expected_num_submitted=4, + expected_logs=[ + "Dumped 4 objects to ClickHouse", + ], ), CommandOptions( options={"object": "dummy", "limit": 1, "batch_size": 1, "sleep_time": 0}, @@ -119,17 +117,61 @@ def dump_command_basic_options(): CommandOptions( options={"object": "dummy", "batch_size": 2, "sleep_time": 0}, expected_num_submitted=2, - expected_logs=["Now dumping 2 Dummy to ClickHouse",], + expected_logs=[ + "Now dumping 2 Dummy to ClickHouse", + ], ), CommandOptions( - options={"object": "dummy", "batch_size": 1, "sleep_time": 0, "ids": ["1", "2", "3"]}, + options={ + "object": "dummy", + "batch_size": 1, + "sleep_time": 0, + "ids": ["1", "2", "3"], + }, expected_num_submitted=3, - expected_logs=["Now dumping 1 Dummy to ClickHouse", "Dumped 3 objects to ClickHouse"], + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 2 objects to ClickHouse", + ], + ), + CommandOptions( + options={ + "object": "dummy", + "batch_size": 1, + "sleep_time": 0, + "start_pk": 1, + }, + expected_num_submitted=4, + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 4 objects to ClickHouse", + ], + ), + CommandOptions( + options={ + "object": "dummy", + "batch_size": 1, + "sleep_time": 0, + "force": True, + }, + expected_num_submitted=4, + expected_logs=[ + "Now dumping 1 Dummy to ClickHouse", + "Dumped 5 objects to ClickHouse", + ], ), CommandOptions( - options={"object": "dummy", "batch_size": 1, "sleep_time": 0, "start_pk": 1}, + options={ + "object": "dummy", + "batch_size": 2, + "sleep_time": 0, + "ids_to_skip": [1], + }, expected_num_submitted=4, - expected_logs=["Now dumping 1 Dummy to ClickHouse", "Dumped 4 objects to ClickHouse"], + expected_logs=[ + "Now dumping 2 Dummy to ClickHouse", + "Dumped 3 objects to ClickHouse", + ], ), ] diff --git a/tests/test_base_sink.py b/tests/test_base_sink.py index 46ad934..8be5345 100644 --- a/tests/test_base_sink.py +++ b/tests/test_base_sink.py @@ -227,6 +227,16 @@ def test_get_queryset(self): self.child_sink.get_queryset() self.child_sink.get_model.return_value.objects.all.assert_called_once() + def test_get_queryset_by_start_pk(self): + """ + Test that get_queryset() returns a query set. + """ + self.child_sink.get_model = Mock() + self.child_sink.get_queryset(start_pk=1) + self.child_sink.get_model.return_value.objects.filter.assert_called_once_with( + pk__gt=1 + ) + def test_nested_sink_dump_related(self): """ Test that dump_related() calls the correct methods. From 9976c0a80dcadbd3a67d5c4592cf392e3bf60c02 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 12 Feb 2024 11:17:55 -0500 Subject: [PATCH 7/9] fix: use exclude to skip ids --- event_sink_clickhouse/sinks/base_sink.py | 23 +++++++++---------- .../commands/test_dump_data_to_clickhouse.py | 7 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 8ed9f6f..6288ae9 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -287,26 +287,25 @@ def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump= """ queryset = self.get_queryset(start_pk) if ids: - ids = map(self.pk_format, ids) + ids = [self.pk_format(id) for id in ids] queryset = queryset.filter(pk__in=ids) - skip_ids = ( - [str(item_id) for item_id in skip_ids] if skip_ids else [] - ) + if skip_ids: + skip_ids = [self.pk_format(id) for id in skip_ids] + queryset = queryset.exclude(pk__in=skip_ids) + paginator = Paginator(queryset, batch_size) for i in range(1, paginator.num_pages+1): page = paginator.page(i) items = page.object_list - for item_key in items: - if str(item_key) in skip_ids: - yield item_key, False, f"{self.name} is explicitly skipped" - elif force_dump: - yield item_key, True, "Force is set" + for item in items: + if force_dump: + yield item, True, "Force is set" else: - should_be_dumped, reason = self.should_dump_item(item_key) - yield item_key, should_be_dumped, reason + should_be_dumped, reason = self.should_dump_item(item) + yield item, should_be_dumped, reason - def should_dump_item(self, unique_key): # pylint: disable=unused-argument + def should_dump_item(self, item): # pylint: disable=unused-argument """ Return True if the item should be dumped to ClickHouse, False otherwise """ diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index 41e3c74..d52b5e4 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -132,6 +132,7 @@ def dump_command_basic_options(): expected_logs=[ "Now dumping 1 Dummy to ClickHouse", "Dumped 2 objects to ClickHouse", + "Last ID: 3" ], ), CommandOptions( @@ -165,12 +166,12 @@ def dump_command_basic_options(): "object": "dummy", "batch_size": 2, "sleep_time": 0, - "ids_to_skip": [1], + "ids_to_skip": ["3", "4", "5"], }, expected_num_submitted=4, expected_logs=[ - "Now dumping 2 Dummy to ClickHouse", - "Dumped 3 objects to ClickHouse", + "Now dumping 1 Dummy to ClickHouse", + "Dumped 1 objects to ClickHouse", ], ), ] From c865d16dc1e1c2e07ce67aab2a179ce80ef78483 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 12 Feb 2024 11:25:19 -0500 Subject: [PATCH 8/9] fix: use course_overview object in course_published dump test: add test for course_published changes chore: quality fixes test: add test for get_queryset changes in sinks test: add test for should_dump_item in course_published chore: use invalid emails chore: quality fixes test: add tests for course_published should_dump_item tmp --- .../sinks/course_published.py | 11 ++- test_utils/helpers.py | 3 +- .../commands/test_dump_data_to_clickhouse.py | 12 +-- tests/test_course_published.py | 85 ++++++++++++++----- tests/test_external_id_sink.py | 20 +++++ tests/test_user_profile_sink.py | 20 +++++ 6 files changed, 118 insertions(+), 33 deletions(-) create mode 100644 tests/test_external_id_sink.py create mode 100644 tests/test_user_profile_sink.py diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index a488370..1e457df 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -151,7 +151,7 @@ class CourseOverviewSink(ModelBaseSink): # pylint: disable=abstract-method nested_sinks = [XBlockSink] pk_format = str - def should_dump_item(self, unique_key): + def should_dump_item(self, item): """ Only dump the course if it's been changed since the last time it's been dumped. @@ -162,14 +162,14 @@ def should_dump_item(self, unique_key): - reason why course needs, or does not need, to be dumped (string) """ - course_last_dump_time = self.get_last_dumped_timestamp(unique_key) + course_last_dump_time = self.get_last_dumped_timestamp(item) # If we don't have a record of the last time this command was run, # we should serialize the course and dump it if course_last_dump_time is None: return True, "Course is not present in ClickHouse" - course_last_published_date = self.get_course_last_published(unique_key) + course_last_published_date = self.get_course_last_published(item) # If we've somehow dumped this course but there is no publish date # skip it @@ -197,7 +197,7 @@ def should_dump_item(self, unique_key): ) return needs_dump, reason - def get_course_last_published(self, course_key): + def get_course_last_published(self, course_overview): """ Get approximate last publish date for the given course. We use the 'modified' column in the CourseOverview table as a quick and easy @@ -211,8 +211,7 @@ def get_course_last_published(self, course_key): is sortable and similar to ISO 8601: https://docs.python.org/3/library/datetime.html#datetime.date.__str__ """ - CourseOverview = self.get_model() - approx_last_published = CourseOverview.get_from_id(course_key).modified + approx_last_published = course_overview.modified if approx_last_published: return str(approx_last_published) diff --git a/test_utils/helpers.py b/test_utils/helpers.py index f748fa5..682985c 100644 --- a/test_utils/helpers.py +++ b/test_utils/helpers.py @@ -164,8 +164,7 @@ def mock_course_overview(): Create a fake CourseOverview object that supports just the things we care about. """ mock_overview = MagicMock() - mock_overview.get_from_id = MagicMock() - mock_overview.get_from_id.return_value = fake_course_overview_factory(datetime.now()) + mock_overview.return_value = fake_course_overview_factory(datetime.now()) return mock_overview diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index d52b5e4..9784bce 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -77,18 +77,18 @@ class DummySink(ModelBaseSink): def get_queryset(self, start_pk=None): qs = MockSet( - MockModel(mock_name="john", email="john@edx.com", pk=1), - MockModel(mock_name="jeff", email="jeff@edx.com", pk=2), - MockModel(mock_name="bill", email="bill@edx.com", pk=3), - MockModel(mock_name="joe", email="joe@edx.com", pk=4), - MockModel(mock_name="jim", email="jim@edx.com", pk=5), + MockModel(mock_name="john", email="john@test.invalid", pk=1), + MockModel(mock_name="jeff", email="jeff@test.invalid", pk=2), + MockModel(mock_name="bill", email="bill@test.invalid", pk=3), + MockModel(mock_name="joe", email="joe@test.invalid", pk=4), + MockModel(mock_name="jim", email="jim@test.invalid", pk=5), ) if start_pk: qs = qs.filter(pk__gt=start_pk) return qs def should_dump_item(self, unique_key): - return unique_key.pk!=1, "No reason" + return unique_key.pk != 1, "No reason" def send_item_and_log(self, item_id, serialized_item, many): pass diff --git a/tests/test_course_published.py b/tests/test_course_published.py index 4428a40..53567b6 100644 --- a/tests/test_course_published.py +++ b/tests/test_course_published.py @@ -23,7 +23,6 @@ fake_course_overview_factory, fake_serialize_fake_course_overview, get_clickhouse_http_params, - mock_course_overview, mock_detached_xblock_types, ) @@ -114,40 +113,28 @@ def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_o assert f"Error trying to dump Course Overview {course} to ClickHouse!" in caplog.text -@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") -def test_get_course_last_published(mock_overview): +def test_get_course_last_published(): """ Make sure we get a valid date back from this in the expected format. """ # Create a fake course overview, which will return a datetime object - course = mock_course_overview() - mock_overview.return_value = course - - # Request our course last published date - course_key = course_str_factory() + course_overview = fake_course_overview_factory(modified=datetime.now()) # Confirm that the string date we get back is a valid date - last_published_date = CourseOverviewSink(None, None).get_course_last_published(course_key) + last_published_date = CourseOverviewSink(None, None).get_course_last_published(course_overview) dt = datetime.strptime(last_published_date, "%Y-%m-%d %H:%M:%S.%f") assert dt @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter -@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") -def test_no_last_published_date(mock_overview): +def test_no_last_published_date(): """ Test that we get a None value back for courses that don't have a modified date. In some cases there is not modified date on a course. In coursegraph we skipped these if they are already in the database, so we're continuing this trend here. """ - # Fake a course with no modified date - course = mock_course_overview() - mock_overview.return_value = course - mock_overview.return_value.get_from_id.return_value = fake_course_overview_factory(modified=None) - - # Request our course last published date - course_key = course_str_factory() + course_overview = fake_course_overview_factory(modified=None) # should_dump_course will reach out to ClickHouse for the last dump date # we'll fake the response here to have any date, such that we'll exercise @@ -159,12 +146,72 @@ def test_no_last_published_date(mock_overview): # Confirm that the string date we get back is a valid date sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) - should_dump_course, reason = sink.should_dump_item(course_key) + should_dump_course, reason = sink.should_dump_item(course_overview) assert should_dump_course is False assert reason == "No last modified date in CourseOverview" +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_should_dump_item(): + """ + Test that we get the expected results from should_dump_item. + """ + course_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) + + # should_dump_course will reach out to ClickHouse for the last dump date + # we'll fake the response here to have any date, such that we'll exercise + # all the "no modified date" code. + responses.get( + "https://foo.bar/", + body="2023-05-03 15:47:39.331024+00:00" + ) + + # Confirm that the string date we get back is a valid date + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_overview) + + assert should_dump_course is True + assert "Course has been published since last dump time - " in reason + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_should_dump_item_not_in_clickhouse(): + """ + Test that a course gets dumped if it's never been dumped before + """ + course_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) + responses.get( + "https://foo.bar/", + body="" + ) + + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_overview) + + assert should_dump_course is True + assert "Course is not present in ClickHouse" == reason + + +@responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter +def test_should_dump_item_no_needs_dump(): + """ + Test that a course gets dumped if it's never been dumped before + """ + modified = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") + course_overview = fake_course_overview_factory(modified=modified) + responses.get( + "https://foo.bar/", + body=modified + ) + + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_overview) + + assert should_dump_course is False + assert "Course has NOT been published since last dump time - " in reason + + @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter def test_course_not_present_in_clickhouse(): """ diff --git a/tests/test_external_id_sink.py b/tests/test_external_id_sink.py new file mode 100644 index 0000000..f0aead0 --- /dev/null +++ b/tests/test_external_id_sink.py @@ -0,0 +1,20 @@ +""" +Test the external_id_sink module. +""" + +from unittest.mock import patch + +from event_sink_clickhouse.sinks.external_id_sink import ExternalIdSink + + +@patch("event_sink_clickhouse.sinks.external_id_sink.ModelBaseSink.get_queryset") +def test_get_queryset(mock_get_queryset): + """ + Test the get_queryset method. + """ + sink = ExternalIdSink(None, None) + + sink.get_queryset() + + mock_get_queryset.assert_called_once_with(None) + mock_get_queryset.return_value.select_related.assert_called_once_with("user", "external_id_type") diff --git a/tests/test_user_profile_sink.py b/tests/test_user_profile_sink.py new file mode 100644 index 0000000..abd99d0 --- /dev/null +++ b/tests/test_user_profile_sink.py @@ -0,0 +1,20 @@ +""" +Test the external_id_sink module. +""" + +from unittest.mock import patch + +from event_sink_clickhouse.sinks.user_profile_sink import UserProfileSink + + +@patch("event_sink_clickhouse.sinks.external_id_sink.ModelBaseSink.get_queryset") +def test_get_queryset(mock_get_queryset): + """ + Test the get_queryset method. + """ + sink = UserProfileSink(None, None) + + sink.get_queryset() + + mock_get_queryset.assert_called_once_with(None) + mock_get_queryset.return_value.select_related.assert_called_once_with("user") From 69fa84e194fdd1a5fcb950f3ae0992d506adfe2a Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 12 Feb 2024 12:26:26 -0500 Subject: [PATCH 9/9] chore: bump version to 1.1.0 --- event_sink_clickhouse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_sink_clickhouse/__init__.py b/event_sink_clickhouse/__init__.py index a656735..c713846 100644 --- a/event_sink_clickhouse/__init__.py +++ b/event_sink_clickhouse/__init__.py @@ -2,4 +2,4 @@ A sink for Open edX events to send them to ClickHouse. """ -__version__ = "1.0.0" +__version__ = "1.1.0"