Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
chore: use paginator for querysets
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 6, 2024
1 parent bd32fe0 commit d8d4c80
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

def dump_target_objects_to_clickhouse(
sink=None,
start_pk=None,
object_ids=[],
objects_to_skip=[],
force=False,
Expand All @@ -50,32 +51,27 @@ def dump_target_objects_to_clickhouse(
submitted_objects = []
skipped_objects = []

index = 0
objects_to_submit = []
for object_id, should_be_dumped, reason in sink.fetch_target_items(
object_ids, objects_to_skip, force
):
log.info(f"Iteration {index}: {object_id}")
index += 1

for object, should_be_dumped, reason in sink.fetch_target_items(
start_pk, object_ids, objects_to_skip, force, batch_size
):
if not should_be_dumped:
skipped_objects.append(object_id)
skipped_objects.append(object.pk)
log.info(
f"{sink.model} {index}: Skipping object {object_id}, reason: '{reason}'"
f"{sink.model}: Skipping object {object.pk}, reason: '{reason}'"
)
else:
log.info(
f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'."
)
objects_to_submit.append(object_id)
objects_to_submit.append(object)

if len(objects_to_submit) % batch_size == 0:
sink.dump(objects_to_submit, many=True)
submitted_objects.extend(objects_to_submit)
objects_to_submit = []
log.info(f"Last IDs: {object.pk}")
time.sleep(sleep_time)

submitted_objects.append(str(object_id))
submitted_objects.append(str(object.pk))

if limit and len(submitted_objects) == limit:
log.info(
Expand All @@ -86,7 +82,6 @@ def dump_target_objects_to_clickhouse(
if objects_to_submit:
sink.dump(objects_to_submit, many=True)

return submitted_objects, skipped_objects


class Command(BaseCommand):
Expand Down Expand Up @@ -127,6 +122,12 @@ def add_arguments(self, parser):
type=str,
help="the type of object to dump",
)
parser.add_argument(
"--start_pk",
type=int,
help="the primary key to start at",
default=None,
)
parser.add_argument(
"--ids",
metavar="KEY",
Expand Down Expand Up @@ -198,26 +199,13 @@ def handle(self, *args, **options):

Sink = get_sink_by_model(options["object"])
sink = Sink(connection_overrides, log)
submitted_objects, skipped_objects = dump_target_objects_to_clickhouse(
dump_target_objects_to_clickhouse(
sink,
options["start_pk"],
[object_id.strip() for object_id in ids],
[object_id.strip() for object_id in ids_to_skip],
options["force"],
options["limit"],
options["batch_size"],
options["sleep_time"],
)

log.info(
"%d objects submitted for export to ClickHouse. %d objects skipped.",
len(submitted_objects),
len(skipped_objects),
)

if not submitted_objects:
log.info("No objects submitted for export to ClickHouse at all!")
else:
log.info( # pylint: disable=logging-not-lazy
"These objects were submitted for dump to ClickHouse successfully:\n\t"
+ "\n\t".join(submitted_objects)
)

25 changes: 21 additions & 4 deletions event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import requests
from django.conf import settings
from django.core.paginator import Paginator
from edx_toggles.toggles import WaffleFlag

from event_sink_clickhouse.utils import get_model
Expand Down Expand Up @@ -151,11 +152,14 @@ def get_model(self):
"""
return get_model(self.model)

def get_queryset(self):
def get_queryset(self, start_pk=None):
"""
Return the queryset to be used for the insert
"""
return self.get_model().objects.all()
if start_pk:
return self.get_model().objects.filter(pk__gt=start_pk).order_by("pk")
else:
return self.get_model().objects.all().order_by("pk")

def dump(self, item_id, many=False, initial=None):
"""
Expand Down Expand Up @@ -272,19 +276,32 @@ def send_item(self, serialized_item, many=False):

self._send_clickhouse_request(request)

def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False):
def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump=False, batch_size=None):
"""
Fetch the items that should be dumped to ClickHouse
"""
if ids:
item_keys = [self.convert_id(item_id) for item_id in ids]
else:
item_keys = [item.id for item in self.get_queryset()]
print("load queryset")
item_keys = self.get_queryset(start_pk)

skip_ids = (
[str(item_id) for item_id in skip_ids] if skip_ids else []
)
if batch_size:
paginator = Paginator(item_keys, batch_size)
page = paginator.page(1)
while page.has_next():
page = paginator.page(page.next_page_number())
yield from self.iter_target_items(page.object_list, skip_ids, force_dump)
else:
yield from self.iter_target_items(item_keys, skip_ids, force_dump)

def iter_target_items(self, item_keys, skip_ids, force_dump):
"""
Iterate through the items that should be dumped to ClickHouse
"""
for item_key in item_keys:
if str(item_key) in skip_ids:
yield item_key, False, f"{self.name} is explicitly skipped"
Expand Down
2 changes: 1 addition & 1 deletion event_sink_clickhouse/sinks/course_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,6 @@ def get_course_last_published(self, course_key):
def convert_id(self, item_id):
return CourseKey.from_string(item_id)

def get_queryset(self):
def get_queryset(self, start_pk=None):
modulestore = get_modulestore()
return modulestore.get_course_summaries()
3 changes: 3 additions & 0 deletions event_sink_clickhouse/sinks/user_profile_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ class UserProfileSink(ModelBaseSink): # pylint: disable=abstract-method
timestamp_field = "time_last_dumped"
name = "User Profile"
serializer_class = UserProfileSerializer

def get_queryset(self, start_pk=None):
return super().get_queryset(start_pk).select_related("user")

0 comments on commit d8d4c80

Please sign in to comment.