diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 983647a..48f616a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,14 @@ Change Log Unreleased ********** +0.11.2 - 2024-10-04 +******************* + +Fixes +===== + +* wait for transaction commit before trying to sink models. + 0.11.1 - 2024-09-06 ******************* diff --git a/platform_plugin_aspects/__init__.py b/platform_plugin_aspects/__init__.py index 1c3b1a0..61d8f8b 100644 --- a/platform_plugin_aspects/__init__.py +++ b/platform_plugin_aspects/__init__.py @@ -5,6 +5,6 @@ import os from pathlib import Path -__version__ = "0.11.1" +__version__ = "0.11.2" ROOT_DIRECTORY = Path(os.path.dirname(os.path.abspath(__file__))) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index fc630f8..3dba75b 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -5,6 +5,7 @@ from django.db import transaction from django.db.models.signals import post_delete, post_save from django.dispatch import Signal, receiver +from opaque_keys import InvalidKeyError from platform_plugin_aspects.sinks import ( CourseEnrollmentSink, @@ -63,34 +64,33 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra ) -def on_user_profile_updated(instance): - """ - Queues the UserProfile dump job when the parent transaction is committed. - """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - - sink = UserProfileSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) - - -def on_user_profile_updated_txn(**kwargs): +def on_user_profile_updated_txn(*args, **kwargs): """ Handle user_profile saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone queuing the Celery task until after the transaction is committed. """ - transaction.on_commit( - lambda: on_user_profile_updated(kwargs["instance"]) - ) # pragma: no cover + + def on_user_profile_updated(instance, **kwargs): + """ + Queues the UserProfile dump job when the parent transaction is committed. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = UserProfileSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs)) # Connect the UserProfile.post_save signal handler only if we have a model to attach to. @@ -102,30 +102,42 @@ def on_user_profile_updated_txn(**kwargs): ) # pragma: no cover -def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover - sender, instance, **kwargs -): +def on_externalid_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = ExternalIdSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs + ): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = ExternalIdSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs)) # Connect the ExternalId.post_save signal handler only if we have a model to attach to. # (prevents celery errors during tests) _external_id = get_model("external_id") if _external_id: - post_save.connect(on_externalid_saved, sender=_external_id) # pragma: no cover + post_save.connect(on_externalid_saved_txn, sender=_external_id) # pragma: no cover @receiver(USER_RETIRE_LMS_MISC) @@ -148,75 +160,111 @@ def on_user_retirement( # pylint: disable=unused-argument # pragma: no cover ) -def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover - sender, instance, **kwargs -): +def on_tag_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = TagSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs + ): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = TagSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_tag_saved(*args, **kwargs)) # Connect the ExternalId.post_save signal handler only if we have a model to attach to. # (prevents celery errors during tests) _tag = get_model("tag") if _tag: - post_save.connect(on_tag_saved, sender=_tag) # pragma: no cover + post_save.connect(on_tag_saved_txn, sender=_tag) # pragma: no cover -def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover - sender, instance, **kwargs -): +def on_taxonomy_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = TaxonomySink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs + ): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = TaxonomySink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs)) # Connect the ExternalId.post_save signal handler only if we have a model to attach to. # (prevents celery errors during tests) _taxonomy = get_model("taxonomy") if _taxonomy: - post_save.connect(on_taxonomy_saved, sender=_taxonomy) # pragma: no cover + post_save.connect(on_taxonomy_saved_txn, sender=_taxonomy) # pragma: no cover -def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover +def on_object_tag_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = ObjectTagSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = ObjectTagSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + on_object_tag_deleted(sender, instance, **kwargs) - on_object_tag_deleted(sender, instance, **kwargs) + transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs)) def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cover @@ -235,7 +283,7 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove try: CourseOverview.objects.get(id=instance.object_id) dump_course_to_clickhouse.delay(instance.object_id) - except CourseOverview.DoesNotExist: + except (CourseOverview.DoesNotExist, InvalidKeyError): pass @@ -243,5 +291,5 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove # (prevents celery errors during tests) _object_tag = get_model("object_tag") if _object_tag: # pragma: no cover - post_save.connect(on_object_tag_saved, sender=_object_tag) + post_save.connect(on_object_tag_saved_txn, sender=_object_tag) post_delete.connect(on_object_tag_deleted, sender=_object_tag) diff --git a/platform_plugin_aspects/tests/test_signals.py b/platform_plugin_aspects/tests/test_signals.py index a720ae1..5e1fe66 100644 --- a/platform_plugin_aspects/tests/test_signals.py +++ b/platform_plugin_aspects/tests/test_signals.py @@ -7,11 +7,10 @@ from django.test import TestCase from platform_plugin_aspects.signals import ( - on_externalid_saved, + on_externalid_saved_txn, on_user_retirement, receive_course_publish, ) -from platform_plugin_aspects.sinks.external_id_sink import ExternalIdSink from platform_plugin_aspects.sinks.user_retire_sink import UserRetirementSink @@ -31,22 +30,16 @@ def test_receive_course_publish(self, mock_dump_task): mock_dump_task.delay.assert_called_once_with(course_key) - @patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse") - def test_on_externalid_saved(self, mock_dump_task): + @patch("platform_plugin_aspects.signals.transaction") + def test_on_externalid_saved(self, mock_transaction): """ Test that on_externalid_saved calls dump_data_to_clickhouse. """ instance = Mock() sender = Mock() - on_externalid_saved(sender, instance) - - sink = ExternalIdSink(None, None) + on_externalid_saved_txn(sender, instance) - mock_dump_task.delay.assert_called_once_with( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + mock_transaction.on_commit.assert_called_once() @patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse") def test_on_user_retirement(self, mock_dump_task):