Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7 from openedx/bmtcril/course_structure_managemen…
Browse files Browse the repository at this point in the history
…t_command

feat: Add a management command to bulk dump course data to ClickHouse
  • Loading branch information
bmtcril authored May 11, 2023
2 parents b07e33f + c50538a commit d8c711d
Show file tree
Hide file tree
Showing 14 changed files with 972 additions and 81 deletions.
10 changes: 4 additions & 6 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ Change Log
Unreleased
**********

* First functional version, includes a CMS listener for COURSE_PUBLISHED
* README updates
* New configuration settings for connection to ClickHouse

0.1.0 – 2023-04-24
0.1.0 – 2023-05-11
**********************************************

Added
=====

* First release on PyPI.
* First release on PyPI
* CMS listener for COURSE_PUBLISHED
* Management command to bulk push course data to ClickHouse
19 changes: 19 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,31 @@ OARS consumes the data sent to ClickHouse by this plugin as part of data
enrichment for reporting, or capturing data that otherwise does not fit in
xAPI.

Sinks
*****

Currently the only sink is in the CMS. It listens for the ``COURSE_PUBLISHED``
signal and serializes a subset of the published course blocks into one table
and the relationships between blocks into another table. With those we are
able to recreate the "graph" of the course and get relevant data, such as
block names, for reporting.

Commands
********

In addition to being an event listener, this package provides commands for
exporting the same data in bulk. This allows bootstrapping a new data platform
or backfilling lost or missing data. Currently the only command is the Django
command for the ``COURSE_PUBLISHED`` data:

``python manage.py cms dump_courses_to_clickhouse``

This command allows bulk export of all courses, or various limiting factors.
Please see the command help for details:

``python manage.py cms dump_courses_to_clickhouse -h``


.. _Open edX events: https://github.com/openedx/openedx-events
.. _Edx Platform: https://github.com/openedx/edx-platform
.. _ClickHouse: https://clickhouse.com
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""
Management command for exporting the modulestore ClickHouse.
Example usages (see usage for more options):
# Dump all courses published since last dump.
# Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`:
python manage.py cms dump_courses_to_clickhouse
# Dump all courses published since last dump.
# Use custom connection parameters to send to a different ClickHouse instance:
python manage.py cms dump_courses_to_clickhouse --host localhost \
--user user --password password --database research_db --
# Specify certain courses instead of dumping all of them.
# Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`.
python manage.py cms dump_courses_to_clickhouse --courses 'course-v1:A+B+1' 'course-v1:A+B+2'
# Dump a limited number of courses to prevent stress on production systems
python manage.py cms dump_courses_to_clickhouse --limit 1000
"""
import logging
from textwrap import dedent

from django.core.management.base import BaseCommand, CommandError
from edx_django_utils.cache import RequestCache

from event_sink_clickhouse.sinks.course_published import CoursePublishedSink
from event_sink_clickhouse.tasks import dump_course_to_clickhouse

log = logging.getLogger(__name__)


def dump_target_courses_to_clickhouse(
connection_overrides=None,
course_keys=None,
courses_to_skip=None,
force=None,
limit=None
):
"""
Iterates through a list of courses in a modulestore, serializes them to csv,
then submits tasks to post them to ClickHouse.
Arguments:
force: serialize the courses even if they've been recently
serialized
Returns: two lists--one of the courses that had dump jobs queued for them
and one of courses that did not.
"""
sink = CoursePublishedSink(connection_overrides, log)

submitted_courses = []
skipped_courses = []

index = 0
for course_key, should_be_dumped, reason in sink.fetch_target_courses(course_keys, courses_to_skip, force):
log.info(f"Iteration {index}: {course_key}")
index += 1

if not should_be_dumped:
skipped_courses.append(course_key)
log.info(f"Course {index}: Skipping course {course_key}, reason: '{reason}'")
else:
# RequestCache is a local memory cache used in modulestore for performance reasons.
# Normally it is cleared at the end of every request, but in this command it will
# continue to grow until the command is done. To prevent excessive memory consumption
# we clear it every time we dump a course.
RequestCache.clear_all_namespaces()

log.info(
f"Course {index}: Submitting {course_key} for dump to ClickHouse, reason '{reason}'."
)

dump_course_to_clickhouse.apply_async(
kwargs={
"course_key_string": str(course_key),
"connection_overrides": connection_overrides,
}
)

submitted_courses.append(str(course_key))

if limit and len(submitted_courses) == limit:
log.info(f"Limit of {limit} eligible course has been reached, quitting!")
break

return submitted_courses, skipped_courses


class Command(BaseCommand):
"""
Dump course block and relationship data to a ClickHouse instance.
"""
help = dedent(__doc__).strip()

def add_arguments(self, parser):
parser.add_argument(
'--url',
type=str,
help="the URL of the ClickHouse server",
)
parser.add_argument(
'--username',
type=str,
help="the username of the ClickHouse user",
)
parser.add_argument(
'--password',
type=str,
help="the password of the ClickHouse user",
)
parser.add_argument(
'--database',
type=str,
help="the database in ClickHouse to connect to",
)
parser.add_argument(
'--timeout_secs',
type=int,
help="timeout for ClickHouse requests, in seconds",
)
parser.add_argument(
'--courses',
metavar='KEY',
type=str,
nargs='*',
help="keys of courses to serialize; if omitted all courses in system are serialized",
)
parser.add_argument(
'--courses_to_skip',
metavar='KEY',
type=str,
nargs='*',
help="keys of courses to NOT to serialize",
)
parser.add_argument(
'--force',
action='store_true',
help="dump all courses regardless of when they were last published",
)
parser.add_argument(
'--limit',
type=int,
help="maximum number of courses to dump, cannot be used with '--courses' or '--force'",
)

def handle(self, *args, **options):
"""
Iterates through each course, serializes them into graphs, and saves
those graphs to clickhouse.
"""
connection_overrides = {
key: options[key]
for key in ["url", "username", "password", "database", "timeout_secs"]
if options[key]
}

courses = options["courses"] if options["courses"] else []
courses_to_skip = options["courses_to_skip"] if options["courses_to_skip"] else []

if options["limit"] is not None and int(options["limit"]) < 1:
message = "'limit' must be greater than 0!"
log.error(message)
raise CommandError(message)

if options["limit"] and options["force"]:
message = "The 'limit' option cannot be used with 'force' as running the " \
"command repeatedly will result in the same courses being dumped every time."
log.error(message)
raise CommandError(message)

submitted_courses, skipped_courses = dump_target_courses_to_clickhouse(
connection_overrides,
[course_key.strip() for course_key in courses],
[course_key.strip() for course_key in courses_to_skip],
options['force'],
options['limit']
)

log.info(
"%d courses submitted for export to ClickHouse. %d courses skipped.",
len(submitted_courses),
len(skipped_courses),
)

if not submitted_courses:
log.info("No courses submitted for export to ClickHouse at all!")
else:
log.info( # pylint: disable=logging-not-lazy
"These courses were submitted for dump to ClickHouse successfully:\n\t" +
"\n\t".join(submitted_courses)
)
4 changes: 2 additions & 2 deletions event_sink_clickhouse/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
"""


def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument # pragma: no cover
"""
Receives COURSE_PUBLISHED signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from .tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel
from event_sink_clickhouse.tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel

dump_course_to_clickhouse.delay(str(course_key))
19 changes: 18 additions & 1 deletion event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Base classes for event sinks
"""
import json
from collections import namedtuple

import requests
Expand Down Expand Up @@ -29,19 +30,35 @@ def __init__(self, connection_overrides, log):
self.ch_database = connection_overrides.get("database", self.ch_database)
self.ch_timeout_secs = connection_overrides.get("timeout_secs", self.ch_timeout_secs)

def _send_clickhouse_request(self, request):
def _send_clickhouse_request(self, request, expected_insert_rows=None):
"""
Perform the actual HTTP requests to ClickHouse.
"""
session = requests.Session()
prepared_request = request.prepare()
response = None

try:
response = session.send(prepared_request, timeout=self.ch_timeout_secs)
response.raise_for_status()

if expected_insert_rows:
summary = response.headers["X-ClickHouse-Summary"]
written_rows = json.loads(summary)["written_rows"]
if expected_insert_rows != int(written_rows):
self.log.error(
f"Clickhouse query {prepared_request.url} expected {expected_insert_rows} "
f"rows to be inserted, but only got {written_rows}!"
)

return response
except requests.exceptions.HTTPError as e:
self.log.error(str(e))
self.log.error(e.response.headers)
self.log.error(e.response)
self.log.error(e.response.text)
raise
except (requests.exceptions.InvalidJSONError, KeyError):
# ClickHouse can be configured not to return the metadata / summary we check above for
# performance reasons. It's not critical, so we eat those here.
return response
Loading

0 comments on commit d8c711d

Please sign in to comment.