From 3ec0cb2737677a42c71d11e86898f2feab2e0f74 Mon Sep 17 00:00:00 2001 From: Braden MacDonald Date: Tue, 14 May 2024 12:30:01 -0700 Subject: [PATCH 1/4] fix: don't call signal handlers like XBLOCK_UPDATED before commit --- xmodule/modulestore/__init__.py | 35 +++++++++++++++-- xmodule/modulestore/mixed.py | 69 ++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/xmodule/modulestore/__init__.py b/xmodule/modulestore/__init__.py index 57a22f5e99b3..310c4b581757 100644 --- a/xmodule/modulestore/__init__.py +++ b/xmodule/modulestore/__init__.py @@ -121,6 +121,7 @@ def __init__(self): self._active_count = 0 self.has_publish_item = False self.has_library_updated_item = False + self._commit_callbacks = [] @property def active(self): @@ -148,6 +149,20 @@ def is_root(self): """ return self._active_count == 1 + def defer_until_commit(self, fn): + """ + Run some code when the changes from this bulk op are committed to the DB + """ + self._commit_callbacks.append(fn) + + def call_commit_callbacks(self): + """ + When the changes have been committed to the DB, call this to run any queued callbacks + """ + for fn in self._commit_callbacks: + fn() + self._commit_callbacks.clear() + class ActiveBulkThread(threading.local): """ @@ -290,15 +305,29 @@ def _end_bulk_operation(self, structure_key, emit_signals=True, ignore_case=Fals # So re-nest until the signals are sent. bulk_ops_record.nest() - if emit_signals and dirty: - self.send_bulk_published_signal(bulk_ops_record, structure_key) - self.send_bulk_library_updated_signal(bulk_ops_record, structure_key) + if dirty: + # Call any "on commit" callback, regardless of if this was "published" or is still draft: + bulk_ops_record.call_commit_callbacks() + # Call any "on publish" handlers - emit_signals is usually false for draft-only changes: + if emit_signals: + self.send_bulk_published_signal(bulk_ops_record, structure_key) + self.send_bulk_library_updated_signal(bulk_ops_record, structure_key) # Signals are sent. Now unnest and clear the bulk op for good. bulk_ops_record.unnest() self._clear_bulk_ops_record(structure_key) + def on_commit_changes_to(self, course_key, fn): + """ + Call some callback when the currently active bulk operation has saved + """ + bulk_ops_record = self._get_bulk_ops_record(course_key) + if bulk_ops_record.active: + bulk_ops_record.defer_until_commit(fn) + else: + fn() # There is no active bulk operation - call fn() now. + def _is_in_bulk_operation(self, course_key, ignore_case=False): """ Return whether a bulk operation is active on `course_key`. diff --git a/xmodule/modulestore/mixed.py b/xmodule/modulestore/mixed.py index 9665772d2718..b71d7e62d116 100644 --- a/xmodule/modulestore/mixed.py +++ b/xmodule/modulestore/mixed.py @@ -787,19 +787,24 @@ def create_child(self, user_id, parent_usage_key, block_type, block_id=None, fie fields (dict): A dictionary specifying initial values for some or all fields in the newly created block """ - modulestore = self._verify_modulestore_support(parent_usage_key.course_key, 'create_child') - xblock = modulestore.create_child( + course_key = parent_usage_key.course_key + store = self._verify_modulestore_support(course_key, 'create_child') + xblock = store.create_child( user_id, parent_usage_key, block_type, block_id=block_id, fields=fields, **kwargs ) - # .. event_implemented_name: XBLOCK_CREATED - XBLOCK_CREATED.send_event( - time=datetime.now(timezone.utc), - xblock_info=XBlockData( - usage_key=xblock.location.for_branch(None), - block_type=block_type, - version=xblock.location + + def send_created_event(): + # .. event_implemented_name: XBLOCK_CREATED + XBLOCK_CREATED.send_event( + time=datetime.now(timezone.utc), + xblock_info=XBlockData( + usage_key=xblock.location.for_branch(None), + block_type=block_type, + version=xblock.location + ) ) - ) + + store.on_commit_changes_to(course_key, send_created_event) return xblock @strip_key @@ -828,17 +833,22 @@ def update_item(self, xblock, user_id, allow_not_found=False, **kwargs): # lint Update the xblock persisted to be the same as the given for all types of fields (content, children, and metadata) attribute the change to the given user. """ - store = self._verify_modulestore_support(xblock.location.course_key, 'update_item') + course_key = xblock.location.course_key + store = self._verify_modulestore_support(course_key, 'update_item') xblock = store.update_item(xblock, user_id, allow_not_found, **kwargs) - # .. event_implemented_name: XBLOCK_UPDATED - XBLOCK_UPDATED.send_event( - time=datetime.now(timezone.utc), - xblock_info=XBlockData( - usage_key=xblock.location.for_branch(None), - block_type=xblock.location.block_type, - version=xblock.location + + def send_updated_event(): + # .. event_implemented_name: XBLOCK_UPDATED + XBLOCK_UPDATED.send_event( + time=datetime.now(timezone.utc), + xblock_info=XBlockData( + usage_key=xblock.location.for_branch(None), + block_type=xblock.location.block_type, + version=xblock.location + ) ) - ) + + store.on_commit_changes_to(course_key, send_updated_event) return xblock @strip_key @@ -846,16 +856,21 @@ def delete_item(self, location, user_id, **kwargs): # lint-amnesty, pylint: dis """ Delete the given item from persistence. kwargs allow modulestore specific parameters. """ - store = self._verify_modulestore_support(location.course_key, 'delete_item') + course_key = location.course_key + store = self._verify_modulestore_support(course_key, 'delete_item') item = store.delete_item(location, user_id=user_id, **kwargs) - # .. event_implemented_name: XBLOCK_DELETED - XBLOCK_DELETED.send_event( - time=datetime.now(timezone.utc), - xblock_info=XBlockData( - usage_key=location, - block_type=location.block_type, + + def send_deleted_event(): + # .. event_implemented_name: XBLOCK_DELETED + XBLOCK_DELETED.send_event( + time=datetime.now(timezone.utc), + xblock_info=XBlockData( + usage_key=location, + block_type=location.block_type, + ) ) - ) + + store.on_commit_changes_to(course_key, send_deleted_event) return item def revert_to_published(self, location, user_id): From 3cc1c647cd92d54cad53c747c4f8c25c99b8ec28 Mon Sep 17 00:00:00 2001 From: Yusuf Musleh Date: Wed, 15 May 2024 20:07:14 +0300 Subject: [PATCH 2/4] feat: Add remaining XBLOCK_CREATED event to on_commit --- xmodule/modulestore/mixed.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/xmodule/modulestore/mixed.py b/xmodule/modulestore/mixed.py index b71d7e62d116..e1ea6640acc5 100644 --- a/xmodule/modulestore/mixed.py +++ b/xmodule/modulestore/mixed.py @@ -758,15 +758,19 @@ def create_item(self, user_id, course_key, block_type, block_id=None, fields=Non """ modulestore = self._verify_modulestore_support(course_key, 'create_item') xblock = modulestore.create_item(user_id, course_key, block_type, block_id=block_id, fields=fields, **kwargs) - # .. event_implemented_name: XBLOCK_CREATED - XBLOCK_CREATED.send_event( - time=datetime.now(timezone.utc), - xblock_info=XBlockData( - usage_key=xblock.location.for_branch(None), - block_type=block_type, - version=xblock.location + + def send_created_event(): + # .. event_implemented_name: XBLOCK_CREATED + XBLOCK_CREATED.send_event( + time=datetime.now(timezone.utc), + xblock_info=XBlockData( + usage_key=xblock.location.for_branch(None), + block_type=block_type, + version=xblock.location + ) ) - ) + + modulestore.on_commit_changes_to(course_key, send_created_event) return xblock @strip_key From 555cb816802fe127f51a29c9dc576328fac709f4 Mon Sep 17 00:00:00 2001 From: Yusuf Musleh Date: Wed, 15 May 2024 20:08:32 +0300 Subject: [PATCH 3/4] fix: Add checks before deleting bulk ops --- xmodule/modulestore/split_mongo/split.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/xmodule/modulestore/split_mongo/split.py b/xmodule/modulestore/split_mongo/split.py index 64e19420a152..05629315a001 100644 --- a/xmodule/modulestore/split_mongo/split.py +++ b/xmodule/modulestore/split_mongo/split.py @@ -233,11 +233,13 @@ def _clear_bulk_ops_record(self, course_key): raise TypeError(f'{course_key!r} is not a CourseLocator or LibraryLocator') if course_key.org and get_library_or_course_attribute(course_key) and course_key.run: - del self._active_bulk_ops.records[course_key.replace(branch=None, version_guid=None)] + if course_key.replace(branch=None, version_guid=None) in self._active_bulk_ops.records: + del self._active_bulk_ops.records[course_key.replace(branch=None, version_guid=None)] else: - del self._active_bulk_ops.records[ - course_key.replace(org=None, course=None, run=None, branch=None) - ] + if course_key.replace(org=None, course=None, run=None, branch=None) in self._active_bulk_ops.records: + del self._active_bulk_ops.records[ + course_key.replace(org=None, course=None, run=None, branch=None) + ] def _start_outermost_bulk_operation(self, bulk_write_record, course_key, ignore_case=False): # lint-amnesty, pylint: disable=arguments-differ """ From 57043cb44e458dcd93eb4c84c51d1977367387c0 Mon Sep 17 00:00:00 2001 From: Yusuf Musleh Date: Fri, 17 May 2024 10:10:41 +0300 Subject: [PATCH 4/4] fix: Change check for active bulk op on_commit Initially we used `_get_bulk_ops_record()` however that had side-effects of creating records since it uses a defaultdict internally. So we swapped the implementation similar how we check in `_clear_bulk_ops_record()` --- xmodule/modulestore/__init__.py | 9 +++++++-- xmodule/modulestore/split_mongo/split.py | 10 ++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/xmodule/modulestore/__init__.py b/xmodule/modulestore/__init__.py index 310c4b581757..f3aee2a58f24 100644 --- a/xmodule/modulestore/__init__.py +++ b/xmodule/modulestore/__init__.py @@ -322,8 +322,13 @@ def on_commit_changes_to(self, course_key, fn): """ Call some callback when the currently active bulk operation has saved """ - bulk_ops_record = self._get_bulk_ops_record(course_key) - if bulk_ops_record.active: + # Check if a bulk op is active. If so, defer fn(); otherwise call it immediately. + # Note: calling _get_bulk_ops_record() here and then checking .active can have side-effects in some cases + # because it creates an entry in the defaultdict if none exists, so we check if the record is active using + # the same code as _clear_bulk_ops_record(), which doesn't modify the defaultdict. + # so we check it this way: + if course_key and course_key.for_branch(None) in self._active_bulk_ops.records: + bulk_ops_record = self._active_bulk_ops.records[course_key.for_branch(None)] bulk_ops_record.defer_until_commit(fn) else: fn() # There is no active bulk operation - call fn() now. diff --git a/xmodule/modulestore/split_mongo/split.py b/xmodule/modulestore/split_mongo/split.py index 05629315a001..64e19420a152 100644 --- a/xmodule/modulestore/split_mongo/split.py +++ b/xmodule/modulestore/split_mongo/split.py @@ -233,13 +233,11 @@ def _clear_bulk_ops_record(self, course_key): raise TypeError(f'{course_key!r} is not a CourseLocator or LibraryLocator') if course_key.org and get_library_or_course_attribute(course_key) and course_key.run: - if course_key.replace(branch=None, version_guid=None) in self._active_bulk_ops.records: - del self._active_bulk_ops.records[course_key.replace(branch=None, version_guid=None)] + del self._active_bulk_ops.records[course_key.replace(branch=None, version_guid=None)] else: - if course_key.replace(org=None, course=None, run=None, branch=None) in self._active_bulk_ops.records: - del self._active_bulk_ops.records[ - course_key.replace(org=None, course=None, run=None, branch=None) - ] + del self._active_bulk_ops.records[ + course_key.replace(org=None, course=None, run=None, branch=None) + ] def _start_outermost_bulk_operation(self, bulk_write_record, course_key, ignore_case=False): # lint-amnesty, pylint: disable=arguments-differ """