Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #74 from openedx/cag/add-batching-for-backfill
Browse files Browse the repository at this point in the history
feat: backfill data to clickhouse in batches and same thread
  • Loading branch information
Ian2012 authored Feb 12, 2024
2 parents bf75e1a + 69fa84e commit ea2c666
Show file tree
Hide file tree
Showing 20 changed files with 398 additions and 597 deletions.
2 changes: 1 addition & 1 deletion event_sink_clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
A sink for Open edX events to send them to ClickHouse.
"""

__version__ = "1.0.0"
__version__ = "1.1.0"

This file was deleted.

114 changes: 57 additions & 57 deletions event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,27 @@
# Dump a limited number of objects to prevent stress on production systems
python manage.py cms dump_objects_to_clickhouse --limit 1000
"""

import logging
import time
from textwrap import dedent

from django.core.management.base import BaseCommand, CommandError

from event_sink_clickhouse.sinks.base_sink import ModelBaseSink
from event_sink_clickhouse.tasks import dump_data_to_clickhouse

log = logging.getLogger(__name__)


def dump_target_objects_to_clickhouse(
connection_overrides=None,
sink=None,
start_pk=None,
object_ids=None,
objects_to_skip=None,
force=None,
force=False,
limit=None,
batch_size=1000,
sleep_time=10,
):
"""
Iterates through a list of objects in the ORN, serializes them to csv,
Expand All @@ -45,44 +48,37 @@ def dump_target_objects_to_clickhouse(
and one of objects that did not.
"""

submitted_objects = []
count = 0
skipped_objects = []
objects_to_submit = []

index = 0
for object_id, should_be_dumped, reason in sink.fetch_target_items(
object_ids, objects_to_skip, force
for obj, should_be_dumped, reason in sink.fetch_target_items(
start_pk, object_ids, objects_to_skip, force, batch_size
):
log.info(f"Iteration {index}: {object_id}")
index += 1

if not should_be_dumped:
skipped_objects.append(object_id)
log.info(
f"{sink.model} {index}: Skipping object {object_id}, reason: '{reason}'"
)
skipped_objects.append(obj.pk)
log.info(f"{sink.model}: Skipping object {obj.pk}, reason: '{reason}'")
else:
log.info(
f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'."
)

dump_data_to_clickhouse.apply_async(
kwargs={
"sink_module": sink.__module__,
"sink_name": sink.__class__.__name__,
"object_id": str(object_id),
"connection_overrides": connection_overrides,
}
)

submitted_objects.append(str(object_id))

if limit and len(submitted_objects) == limit:
objects_to_submit.append(obj)
if len(objects_to_submit) % batch_size == 0:
count += len(objects_to_submit)
sink.dump(objects_to_submit, many=True)
objects_to_submit = []
log.info(f"Last ID: {obj.pk}")
time.sleep(sleep_time)

if limit and count == limit:
log.info(
f"Limit of {limit} eligible objects has been reached, quitting!"
)
break

return submitted_objects, skipped_objects
if objects_to_submit:
sink.dump(objects_to_submit, many=True)
count += len(objects_to_submit)
log.info(f"Last ID: {objects_to_submit[-1].pk}")

log.info(f"Dumped {count} objects to ClickHouse")


class Command(BaseCommand):
Expand Down Expand Up @@ -123,6 +119,12 @@ def add_arguments(self, parser):
type=str,
help="the type of object to dump",
)
parser.add_argument(
"--start_pk",
type=int,
help="the primary key to start at",
default=None,
)
parser.add_argument(
"--ids",
metavar="KEY",
Expand All @@ -147,6 +149,18 @@ def add_arguments(self, parser):
type=int,
help="maximum number of objects to dump, cannot be used with '--ids' or '--force'",
)
parser.add_argument(
"--batch_size",
type=int,
default=10000,
help="number of objects to dump in a single batch",
)
parser.add_argument(
"--sleep_time",
type=int,
default=1,
help="number of seconds to sleep between batches",
)

def handle(self, *args, **options):
"""
Expand Down Expand Up @@ -179,29 +193,15 @@ def handle(self, *args, **options):
log.error(message)
raise CommandError(message)

for cls in ModelBaseSink.__subclasses__(): # pragma: no cover
if cls.model == options["object"]:
sink = cls(connection_overrides, log)
submitted_objects, skipped_objects = dump_target_objects_to_clickhouse(
connection_overrides,
sink,
[object_id.strip() for object_id in ids],
[object_id.strip() for object_id in ids_to_skip],
options["force"],
options["limit"],
)

log.info(
"%d objects submitted for export to ClickHouse. %d objects skipped.",
len(submitted_objects),
len(skipped_objects),
)

if not submitted_objects:
log.info("No objects submitted for export to ClickHouse at all!")
else:
log.info( # pylint: disable=logging-not-lazy
"These objects were submitted for dump to ClickHouse successfully:\n\t"
+ "\n\t".join(submitted_objects)
)
break
Sink = ModelBaseSink.get_sink_by_model_name(options["object"])
sink = Sink(connection_overrides, log)
dump_target_objects_to_clickhouse(
sink,
options["start_pk"],
[object_id.strip() for object_id in ids],
[object_id.strip() for object_id in ids_to_skip],
options["force"],
options["limit"],
options["batch_size"],
options["sleep_time"],
)
2 changes: 1 addition & 1 deletion event_sink_clickhouse/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def plugin_settings(settings):
# to avoid pulling in more dependencies to the platform than necessary.
"url": "http://clickhouse:8123",
"username": "ch_cms",
"password": "TYreGozgtDG3vkoWPUHVVM6q",
"password": "password",
"database": "event_sink",
"timeout_secs": 5,
}
Expand Down
Loading

0 comments on commit ea2c666

Please sign in to comment.