From b53d5b6603093602d96963189eacb84a3be72e45 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Wed, 8 Nov 2023 15:44:46 +0100 Subject: [PATCH 01/14] Replace pull/delete methods on link with getitem --- link/domain/link.py | 22 ++++++---------------- link/service/handlers.py | 6 ++++-- tests/integration/test_uow.py | 30 +++++++++++++++--------------- tests/unit/entities/test_link.py | 14 +++----------- 4 files changed, 28 insertions(+), 44 deletions(-) diff --git a/link/domain/link.py b/link/domain/link.py index 942d542..5f3c99f 100644 --- a/link/domain/link.py +++ b/link/domain/link.py @@ -93,27 +93,17 @@ def identifiers(self) -> frozenset[Identifier]: """Return the identifiers of all entities in the link.""" return frozenset(entity.identifier for entity in self) - def pull(self, requested: Iterable[Identifier]) -> None: - """Pull the requested entities.""" - requested = set(requested) - self._validate_requested(requested) - for entity in (entity for entity in self if entity.identifier in requested): - entity.pull() - - def delete(self, requested: Iterable[Identifier]) -> None: - """Delete the requested entities.""" - requested = set(requested) - self._validate_requested(requested) - for entity in (entity for entity in self if entity.identifier in requested): - entity.delete() + def __getitem__(self, identifier: Identifier) -> Entity: + """Return the entity with the given identifier.""" + try: + return next(entity for entity in self if entity.identifier == identifier) + except StopIteration as error: + raise KeyError("Requested entity not present in link") from error def list_idle_entities(self) -> frozenset[Identifier]: """List the identifiers of all idle entities in the link.""" return frozenset(entity.identifier for entity in self if entity.state is Idle) - def _validate_requested(self, requested: Iterable[Identifier]) -> None: - assert set(requested) <= self.identifiers, "Requested identifiers not present in link." - def __contains__(self, entity: object) -> bool: """Check if the link contains the given entity.""" return entity in self._entities diff --git a/link/service/handlers.py b/link/service/handlers.py index 625c9c7..20f98a1 100644 --- a/link/service/handlers.py +++ b/link/service/handlers.py @@ -11,14 +11,16 @@ def pull(command: commands.PullEntities, *, uow: UnitOfWork) -> None: """Pull entities across the link.""" with uow: - uow.link.pull(command.requested) + for identifier in command.requested: + uow.link[identifier].pull() uow.commit() def delete(command: commands.DeleteEntities, *, uow: UnitOfWork) -> None: """Delete pulled entities.""" with uow: - uow.link.delete(command.requested) + for identifier in command.requested: + uow.link[identifier].delete() uow.commit() diff --git a/tests/integration/test_uow.py b/tests/integration/test_uow.py index 9aabc0f..36cd21b 100644 --- a/tests/integration/test_uow.py +++ b/tests/integration/test_uow.py @@ -7,7 +7,7 @@ from link.domain import events from link.domain.state import Commands, Components, Operations, Transition, states from link.service.uow import UnitOfWork -from tests.assignments import create_assignments, create_identifier, create_identifiers +from tests.assignments import create_assignments, create_identifier from .gateway import FakeLinkGateway @@ -20,8 +20,8 @@ def initialize(assignments: Mapping[Components, Iterable[str]]) -> tuple[FakeLin def test_updates_are_applied_to_gateway_on_commit() -> None: gateway, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.pull(create_identifiers("1")) - uow.link.delete(create_identifiers("2")) + uow.link[create_identifier("1")].pull() + uow.link[create_identifier("2")].delete() uow.commit() actual = {(entity.identifier, entity.state) for entity in gateway.create_link()} expected = {(create_identifier("1"), states.Pulled), (create_identifier("2"), states.Idle)} @@ -31,8 +31,8 @@ def test_updates_are_applied_to_gateway_on_commit() -> None: def test_updates_are_discarded_on_context_exit() -> None: gateway, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.pull(create_identifiers("1")) - uow.link.delete(create_identifiers("2")) + uow.link[create_identifier("1")].pull() + uow.link[create_identifier("2")].delete() actual = {(entity.identifier, entity.state) for entity in gateway.create_link()} expected = {(create_identifier("1"), states.Idle), (create_identifier("2"), states.Pulled)} assert actual == expected @@ -41,8 +41,8 @@ def test_updates_are_discarded_on_context_exit() -> None: def test_updates_are_discarded_on_rollback() -> None: gateway, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.pull(create_identifiers("1")) - uow.link.delete(create_identifiers("2")) + uow.link[create_identifier("1")].pull() + uow.link[create_identifier("2")].delete() uow.rollback() actual = {(entity.identifier, entity.state) for entity in gateway.create_link()} expected = {(create_identifier("1"), states.Idle), (create_identifier("2"), states.Pulled)} @@ -101,7 +101,7 @@ def test_link_expires_when_committing() -> None: link = uow.link uow.commit() with pytest.raises(RuntimeError, match="expired entity"): - link.pull(create_identifiers("1")) + link[create_identifier("1")].pull() def test_link_expires_when_rolling_back() -> None: @@ -110,7 +110,7 @@ def test_link_expires_when_rolling_back() -> None: link = uow.link uow.rollback() with pytest.raises(RuntimeError, match="expired entity"): - link.pull(create_identifiers("1")) + link[create_identifier("1")].pull() def test_link_expires_when_exiting_context() -> None: @@ -118,14 +118,14 @@ def test_link_expires_when_exiting_context() -> None: with uow: link = uow.link with pytest.raises(RuntimeError, match="expired entity"): - link.pull(create_identifiers("1")) + link[create_identifier("1")].pull() def test_correct_events_are_collected() -> None: _, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.pull(create_identifiers("1")) - uow.link.delete(create_identifiers("2")) + uow.link[create_identifier("1")].pull() + uow.link[create_identifier("2")].delete() uow.commit() expected = [ events.StateChanged( @@ -172,14 +172,14 @@ def test_correct_events_are_collected() -> None: def test_unit_must_be_committed_to_collect_events() -> None: _, uow = initialize({Components.SOURCE: {"1"}}) with uow: - uow.link.pull(create_identifiers("1")) + uow.link[create_identifier("1")].pull() assert list(uow.collect_new_events()) == [] def test_events_can_only_be_collected_once() -> None: _, uow = initialize({Components.SOURCE: {"1"}}) with uow: - uow.link.pull(create_identifiers("1")) + uow.link[create_identifier("1")].pull() uow.commit() list(uow.collect_new_events()) assert list(uow.collect_new_events()) == [] @@ -188,7 +188,7 @@ def test_events_can_only_be_collected_once() -> None: def test_events_can_only_be_collected_outside_of_context() -> None: _, uow = initialize({Components.SOURCE: {"1"}}) with uow: - uow.link.pull(create_identifiers("1")) + uow.link[create_identifier("1")].pull() uow.commit() with pytest.raises(RuntimeError, match="inside context"): list(uow.collect_new_events()) diff --git a/tests/unit/entities/test_link.py b/tests/unit/entities/test_link.py index b4bfd4a..5750c6c 100644 --- a/tests/unit/entities/test_link.py +++ b/tests/unit/entities/test_link.py @@ -167,15 +167,7 @@ def test_can_get_identifiers_of_entities_in_component( assert set(link.identifiers) == create_identifiers("1", "2") @staticmethod - def test_specifying_identifiers_not_present_in_link_raises_error_when_pulling() -> None: + def test_accessing_entity_not_present_in_link_raises_error() -> None: link = create_link(create_assignments({Components.SOURCE: {"1"}})) - with pytest.raises(AssertionError, match="Requested identifiers not present in link."): - link.pull(create_identifiers("2")) - - @staticmethod - def test_specifying_identifiers_not_present_in_link_raises_error_when_deleting() -> None: - link = create_link( - create_assignments({Components.SOURCE: {"1"}, Components.OUTBOUND: {"1"}, Components.LOCAL: {"1"}}) - ) - with pytest.raises(AssertionError, match="Requested identifiers not present in link."): - link.delete(create_identifiers("2")) + with pytest.raises(KeyError, match="Requested entity not present in link."): + link[create_identifier("2")] From 51a387e51c281cb3784f5b7661cc1537adc94492 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Wed, 8 Nov 2023 16:07:52 +0100 Subject: [PATCH 02/14] Allow pulling/deleting single entity --- link/domain/commands.py | 14 ++++++++++++++ link/infrastructure/link.py | 14 +++++++++----- link/service/handlers.py | 27 +++++++++++++++++++-------- tests/integration/test_services.py | 19 ++++++++++++++++--- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/link/domain/commands.py b/link/domain/commands.py index d6598de..abad976 100644 --- a/link/domain/commands.py +++ b/link/domain/commands.py @@ -11,6 +11,20 @@ class Command: """Base class for all commands.""" +@dataclass(frozen=True) +class PullEntity(Command): + """Pull the requested entity.""" + + requested: Identifier + + +@dataclass(frozen=True) +class DeleteEntity(Command): + """Delete the requested entity.""" + + requested: Identifier + + @dataclass(frozen=True) class PullEntities(Command): """Pull the requested entities.""" diff --git a/link/infrastructure/link.py b/link/infrastructure/link.py index 06f0f77..a916895 100644 --- a/link/infrastructure/link.py +++ b/link/infrastructure/link.py @@ -12,7 +12,7 @@ from link.adapters.identification import IdentificationTranslator from link.adapters.present import create_idle_entities_updater, create_state_change_logger from link.domain import commands, events -from link.service.handlers import delete, list_idle_entities, log_state_change, pull +from link.service.handlers import delete, delete_entity, list_idle_entities, log_state_change, pull, pull_entity from link.service.messagebus import CommandHandlers, EventHandlers, MessageBus from link.service.uow import UnitOfWork @@ -48,18 +48,22 @@ def inner(obj: type) -> Any: source_restriction: IterationCallbackList[PrimaryKey] = IterationCallbackList() idle_entities_updater = create_idle_entities_updater(translator, create_content_replacer(source_restriction)) logger = logging.getLogger(obj.__name__) + command_handlers: CommandHandlers = {} - command_handlers[commands.PullEntities] = partial(pull, uow=uow) - command_handlers[commands.DeleteEntities] = partial(delete, uow=uow) + event_handlers: EventHandlers = {} + bus = MessageBus(uow, command_handlers, event_handlers) + command_handlers[commands.PullEntity] = partial(pull_entity, uow=uow) + command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow) + command_handlers[commands.PullEntities] = partial(pull, message_bus=bus) + command_handlers[commands.DeleteEntities] = partial(delete, message_bus=bus) command_handlers[commands.ListIdleEntities] = partial( list_idle_entities, uow=uow, output_port=idle_entities_updater ) - event_handlers: EventHandlers = {} event_handlers[events.StateChanged] = [ partial(log_state_change, log=create_state_change_logger(translator, logger.info)) ] event_handlers[events.InvalidOperationRequested] = [lambda event: None] - bus = MessageBus(uow, command_handlers, event_handlers) + controller = DJController(bus, translator) source_restriction.callback = controller.list_idle_entities diff --git a/link/service/handlers.py b/link/service/handlers.py index 20f98a1..961b7ca 100644 --- a/link/service/handlers.py +++ b/link/service/handlers.py @@ -5,25 +5,36 @@ from link.domain import commands, events +from .messagebus import MessageBus from .uow import UnitOfWork -def pull(command: commands.PullEntities, *, uow: UnitOfWork) -> None: - """Pull entities across the link.""" +def pull_entity(command: commands.PullEntity, *, uow: UnitOfWork) -> None: + """Pull an entity across the link.""" with uow: - for identifier in command.requested: - uow.link[identifier].pull() + uow.link[command.requested].pull() uow.commit() -def delete(command: commands.DeleteEntities, *, uow: UnitOfWork) -> None: - """Delete pulled entities.""" +def delete_entity(command: commands.DeleteEntity, *, uow: UnitOfWork) -> None: + """Delete a pulled entity.""" with uow: - for identifier in command.requested: - uow.link[identifier].delete() + uow.link[command.requested].delete() uow.commit() +def pull(command: commands.PullEntities, *, message_bus: MessageBus) -> None: + """Pull entities across the link.""" + for identifier in command.requested: + message_bus.handle(commands.PullEntity(identifier)) + + +def delete(command: commands.DeleteEntities, *, message_bus: MessageBus) -> None: + """Delete pulled entities.""" + for identifier in command.requested: + message_bus.handle(commands.DeleteEntity(identifier)) + + def list_idle_entities( command: commands.ListIdleEntities, *, diff --git a/tests/integration/test_services.py b/tests/integration/test_services.py index ee1630b..c946f49 100644 --- a/tests/integration/test_services.py +++ b/tests/integration/test_services.py @@ -7,7 +7,8 @@ from link.domain import commands, events from link.domain.state import Components, Processes, State, states -from link.service.handlers import delete, list_idle_entities, pull +from link.service.handlers import delete, delete_entity, list_idle_entities, pull, pull_entity +from link.service.messagebus import CommandHandlers, EventHandlers, MessageBus from link.service.uow import UnitOfWork from tests.assignments import create_assignments, create_identifiers @@ -73,11 +74,23 @@ def create_uow(state: type[State], process: Processes | None = None, is_tainted: def create_pull_service(uow: UnitOfWork) -> Callable[[commands.PullEntities], None]: - return partial(pull, uow=uow) + command_handlers: CommandHandlers = {} + command_handlers[commands.PullEntity] = partial(pull_entity, uow=uow) + event_handlers: EventHandlers = {} + event_handlers[events.InvalidOperationRequested] = [lambda event: None] + event_handlers[events.StateChanged] = [lambda event: None] + bus = MessageBus(uow, command_handlers, event_handlers) + return partial(pull, message_bus=bus) def create_delete_service(uow: UnitOfWork) -> Callable[[commands.DeleteEntities], None]: - return partial(delete, uow=uow) + command_handlers: CommandHandlers = {} + command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow) + event_handlers: EventHandlers = {} + event_handlers[events.InvalidOperationRequested] = [lambda event: None] + event_handlers[events.StateChanged] = [lambda event: None] + bus = MessageBus(uow, command_handlers, event_handlers) + return partial(delete, message_bus=bus) class EntityConfig(TypedDict): From bcbd21c6d99f6c0a04f1ab8ab73540c5742fe9e7 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Wed, 8 Nov 2023 16:47:02 +0100 Subject: [PATCH 03/14] Add new events to processes starting/finishing --- link/domain/events.py | 34 +++++++++++++++++++++++++++++- link/infrastructure/link.py | 8 +++++-- link/service/handlers.py | 13 ++++++++++-- tests/integration/test_services.py | 16 ++++++++++---- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/link/domain/events.py b/link/domain/events.py index 3c51731..5056e12 100644 --- a/link/domain/events.py +++ b/link/domain/events.py @@ -7,7 +7,7 @@ from .custom_types import Identifier if TYPE_CHECKING: - from .state import Commands, Operations, State, Transition + from .state import Commands, Operations, Processes, State, Transition @dataclass(frozen=True) @@ -43,3 +43,35 @@ class IdleEntitiesListed(Event): """Idle entities in a link have been listed.""" identifiers: frozenset[Identifier] + + +@dataclass(frozen=True) +class ProcessStarted(Event): + """A process for an entity was started.""" + + process: Processes + identifier: Identifier + + +@dataclass(frozen=True) +class ProcessFinished(Event): + """A process for an entity was finished.""" + + process: Processes + identifier: Identifier + + +@dataclass(frozen=True) +class ProcessesStarted(Event): + """The same process has been started for multiple entities.""" + + process: Processes + identifiers: frozenset[Identifier] + + +@dataclass(frozen=True) +class ProcessesFinished(Event): + """The same process has been finished for multiple entities.""" + + process: Processes + identifiers: frozenset[Identifier] diff --git a/link/infrastructure/link.py b/link/infrastructure/link.py index a916895..317b75a 100644 --- a/link/infrastructure/link.py +++ b/link/infrastructure/link.py @@ -52,13 +52,17 @@ def inner(obj: type) -> Any: command_handlers: CommandHandlers = {} event_handlers: EventHandlers = {} bus = MessageBus(uow, command_handlers, event_handlers) - command_handlers[commands.PullEntity] = partial(pull_entity, uow=uow) - command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow) + command_handlers[commands.PullEntity] = partial(pull_entity, uow=uow, message_bus=bus) + command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow, message_bus=bus) command_handlers[commands.PullEntities] = partial(pull, message_bus=bus) command_handlers[commands.DeleteEntities] = partial(delete, message_bus=bus) command_handlers[commands.ListIdleEntities] = partial( list_idle_entities, uow=uow, output_port=idle_entities_updater ) + event_handlers[events.ProcessStarted] = [lambda event: None] + event_handlers[events.ProcessFinished] = [lambda event: None] + event_handlers[events.ProcessesStarted] = [lambda event: None] + event_handlers[events.ProcessesFinished] = [lambda event: None] event_handlers[events.StateChanged] = [ partial(log_state_change, log=create_state_change_logger(translator, logger.info)) ] diff --git a/link/service/handlers.py b/link/service/handlers.py index 961b7ca..ff9a344 100644 --- a/link/service/handlers.py +++ b/link/service/handlers.py @@ -4,35 +4,44 @@ from collections.abc import Callable from link.domain import commands, events +from link.domain.state import Processes from .messagebus import MessageBus from .uow import UnitOfWork -def pull_entity(command: commands.PullEntity, *, uow: UnitOfWork) -> None: +def pull_entity(command: commands.PullEntity, *, uow: UnitOfWork, message_bus: MessageBus) -> None: """Pull an entity across the link.""" + message_bus.handle(events.ProcessStarted(Processes.PULL, command.requested)) with uow: uow.link[command.requested].pull() uow.commit() + message_bus.handle(events.ProcessFinished(Processes.PULL, command.requested)) -def delete_entity(command: commands.DeleteEntity, *, uow: UnitOfWork) -> None: +def delete_entity(command: commands.DeleteEntity, *, uow: UnitOfWork, message_bus: MessageBus) -> None: """Delete a pulled entity.""" + message_bus.handle(events.ProcessStarted(Processes.DELETE, command.requested)) with uow: uow.link[command.requested].delete() uow.commit() + message_bus.handle(events.ProcessFinished(Processes.DELETE, command.requested)) def pull(command: commands.PullEntities, *, message_bus: MessageBus) -> None: """Pull entities across the link.""" + message_bus.handle(events.ProcessesStarted(Processes.PULL, command.requested)) for identifier in command.requested: message_bus.handle(commands.PullEntity(identifier)) + message_bus.handle(events.ProcessesFinished(Processes.PULL, command.requested)) def delete(command: commands.DeleteEntities, *, message_bus: MessageBus) -> None: """Delete pulled entities.""" + message_bus.handle(events.ProcessesStarted(Processes.DELETE, command.requested)) for identifier in command.requested: message_bus.handle(commands.DeleteEntity(identifier)) + message_bus.handle(events.ProcessesFinished(Processes.DELETE, command.requested)) def list_idle_entities( diff --git a/tests/integration/test_services.py b/tests/integration/test_services.py index c946f49..2f97c0f 100644 --- a/tests/integration/test_services.py +++ b/tests/integration/test_services.py @@ -75,21 +75,29 @@ def create_uow(state: type[State], process: Processes | None = None, is_tainted: def create_pull_service(uow: UnitOfWork) -> Callable[[commands.PullEntities], None]: command_handlers: CommandHandlers = {} - command_handlers[commands.PullEntity] = partial(pull_entity, uow=uow) event_handlers: EventHandlers = {} + bus = MessageBus(uow, command_handlers, event_handlers) + command_handlers[commands.PullEntity] = partial(pull_entity, uow=uow, message_bus=bus) event_handlers[events.InvalidOperationRequested] = [lambda event: None] event_handlers[events.StateChanged] = [lambda event: None] - bus = MessageBus(uow, command_handlers, event_handlers) + event_handlers[events.ProcessStarted] = [lambda event: None] + event_handlers[events.ProcessFinished] = [lambda event: None] + event_handlers[events.ProcessesStarted] = [lambda event: None] + event_handlers[events.ProcessesFinished] = [lambda event: None] return partial(pull, message_bus=bus) def create_delete_service(uow: UnitOfWork) -> Callable[[commands.DeleteEntities], None]: command_handlers: CommandHandlers = {} - command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow) event_handlers: EventHandlers = {} + bus = MessageBus(uow, command_handlers, event_handlers) + command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow, message_bus=bus) event_handlers[events.InvalidOperationRequested] = [lambda event: None] event_handlers[events.StateChanged] = [lambda event: None] - bus = MessageBus(uow, command_handlers, event_handlers) + event_handlers[events.ProcessStarted] = [lambda event: None] + event_handlers[events.ProcessFinished] = [lambda event: None] + event_handlers[events.ProcessesStarted] = [lambda event: None] + event_handlers[events.ProcessesFinished] = [lambda event: None] return partial(delete, message_bus=bus) From 857ffb2b744b666265197eff3e572b805aa41de3 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 10:38:25 +0100 Subject: [PATCH 04/14] Add handlers for progress related events --- link/service/handlers.py | 21 +++++++++++++++++++++ link/service/progress.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 link/service/progress.py diff --git a/link/service/handlers.py b/link/service/handlers.py index ff9a344..f6b82d9 100644 --- a/link/service/handlers.py +++ b/link/service/handlers.py @@ -7,6 +7,7 @@ from link.domain.state import Processes from .messagebus import MessageBus +from .progress import ProgessDisplay from .uow import UnitOfWork @@ -59,3 +60,23 @@ def list_idle_entities( def log_state_change(event: events.StateChanged, log: Callable[[events.StateChanged], None]) -> None: """Log the state change of an entity.""" log(event) + + +def start_displaying_progress(event: events.ProcessesStarted, *, display: ProgessDisplay) -> None: + """Start displaying progress to the user.""" + display.start(event.process, event.identifiers) + + +def inform_of_started_process(event: events.ProcessStarted, *, display: ProgessDisplay) -> None: + """Update the display with the entity whose process started.""" + display.update_current(event.identifier) + + +def inform_of_finished_process(event: events.ProcessFinished, *, display: ProgessDisplay) -> None: + """Inform the user of an entity finishing its process.""" + display.finish_current() + + +def stop_displaying_progress(event: events.ProcessesFinished, *, display: ProgessDisplay) -> None: + """Stop displaying progress to the user.""" + display.stop() diff --git a/link/service/progress.py b/link/service/progress.py new file mode 100644 index 0000000..9a79687 --- /dev/null +++ b/link/service/progress.py @@ -0,0 +1,28 @@ +"""Contains interfaces for relaying information about the progress of processing a batch of entities.""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Iterable + +from link.domain.custom_types import Identifier +from link.domain.state import Processes + + +class ProgessDisplay(ABC): + """Shows information about the progress of a batch of entities being processed to the user.""" + + @abstractmethod + def start(self, process: Processes, to_be_processed: Iterable[Identifier]) -> None: + """Start showing progress information to the user.""" + + @abstractmethod + def update_current(self, new: Identifier) -> None: + """Update the display to reflect a new entity being currently processed.""" + + @abstractmethod + def finish_current(self) -> None: + """Update the display to reflect that the current entity finished processing.""" + + @abstractmethod + def stop(self) -> None: + """Stop showing progress information to the user.""" From 4f0b61b73c5732714a68eea9d7a6ad1bb6ba9046 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 11:10:54 +0100 Subject: [PATCH 05/14] Add tqdm to pdm --- pdm.lock | 23 ++++++++++++++++------- pyproject.toml | 3 +++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pdm.lock b/pdm.lock index bb3b376..619b64b 100644 --- a/pdm.lock +++ b/pdm.lock @@ -3,10 +3,9 @@ [metadata] groups = ["default", "dev", "profiling"] -cross_platform = true -static_urls = false -lock_version = "4.3" -content_hash = "sha256:4f2ecbc5c197ae91efb9bee8ce350321344adca3dc82106b8c7c7434b6d84c0e" +strategy = ["cross_platform"] +lock_version = "4.4" +content_hash = "sha256:f35ecee06be8d287838d23789019d83c6e11942373a33af153f4a2e0fd692244" [[package]] name = "appdirs" @@ -1693,15 +1692,15 @@ files = [ [[package]] name = "tqdm" -version = "4.65.0" +version = "4.66.1" requires_python = ">=3.7" summary = "Fast, Extensible Progress Meter" dependencies = [ "colorama; platform_system == \"Windows\"", ] files = [ - {file = "tqdm-4.65.0-py3-none-any.whl", hash = "sha256:c4f53a17fe37e132815abceec022631be8ffe1b9381c2e6e30aa70edc99e9671"}, - {file = "tqdm-4.65.0.tar.gz", hash = "sha256:1871fb68a86b8fb3b59ca4cdd3dcccbc7e6d613eeed31f4c332531977b89beb5"}, + {file = "tqdm-4.66.1-py3-none-any.whl", hash = "sha256:d302b3c5b53d47bce91fea46679d9c3c6508cf6332229aa1e7d8653723793386"}, + {file = "tqdm-4.66.1.tar.gz", hash = "sha256:d88e651f9db8d8551a62556d3cff9e3034274ca5d66e93197cf2490e2dcb69c7"}, ] [[package]] @@ -1714,6 +1713,16 @@ files = [ {file = "traitlets-5.9.0.tar.gz", hash = "sha256:f6cde21a9c68cf756af02035f72d5a723bf607e862e7be33ece505abf4a3bad9"}, ] +[[package]] +name = "types-tqdm" +version = "4.66.0.4" +requires_python = ">=3.7" +summary = "Typing stubs for tqdm" +files = [ + {file = "types-tqdm-4.66.0.4.tar.gz", hash = "sha256:a2f0ebd4cfd48f4914395819a176d7947387e1b98f9228fca38f8cac1b59891c"}, + {file = "types_tqdm-4.66.0.4-py3-none-any.whl", hash = "sha256:8eda4c5123dd66985a4cb44268705cfa18beb32d66772271ae185e92b8b10c40"}, +] + [[package]] name = "typing-extensions" version = "4.7.1" diff --git a/pyproject.toml b/pyproject.toml index 72a7a18..c7af254 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ authors = [ ] dependencies = [ "datajoint >= 0.12", + "tqdm>=4.66.1", ] requires-python = ">=3.8" dynamic = ["version"] @@ -84,6 +85,7 @@ source = ["link"] exclude_lines = ["if TYPE_CHECKING:"] + [tool.pdm.version] source = "scm" @@ -100,6 +102,7 @@ dev = [ "neovim>=0.3.1", "pdbpp>=0.10.3", "ruff>=0.0.270", + "types-tqdm>=4.66.0.4", ] [tool.pdm.scripts] From 63badc1ebadcdfb17c8ebc22a0e090298d673e34 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:11:36 +0100 Subject: [PATCH 06/14] Add debug logging to message bus --- link/service/messagebus.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/link/service/messagebus.py b/link/service/messagebus.py index 3e98180..3b45a8e 100644 --- a/link/service/messagebus.py +++ b/link/service/messagebus.py @@ -66,6 +66,7 @@ def handle(self, message: Message) -> None: def _handle_command(self, command: Command) -> None: handler = self._command_handlers[type(command)] + logger.debug(f"Handling command {command!r} with handler {handler!r}") try: handler(command) except Exception: @@ -74,6 +75,7 @@ def _handle_command(self, command: Command) -> None: def _handle_event(self, event: Event) -> None: for handler in self._event_handlers[type(event)]: + logger.debug(f"Handling event {event!r} with handler {handler!r}") try: handler(event) except Exception: From c75176306e92fa687b9924c17e4004e19710a430 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:13:44 +0100 Subject: [PATCH 07/14] Add adapter for progress display --- link/adapters/progress.py | 64 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 link/adapters/progress.py diff --git a/link/adapters/progress.py b/link/adapters/progress.py new file mode 100644 index 0000000..8fbeebc --- /dev/null +++ b/link/adapters/progress.py @@ -0,0 +1,64 @@ +"""Contains DataJoint-specific code for relaying progress information to the user.""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Iterable + +from link.domain.custom_types import Identifier +from link.domain.state import Processes +from link.service.progress import ProgessDisplay + +from .identification import IdentificationTranslator + + +class ProgressView(ABC): + """Progress display.""" + + @abstractmethod + def open(self, description: str, total: int, unit: str) -> None: + """Open the progress display showing information to the user.""" + + @abstractmethod + def update_current(self, new: str) -> None: + """Update the display with new information regarding the current iteration.""" + + @abstractmethod + def update_iteration(self) -> None: + """Update the display to reflect that the current iteration finished.""" + + @abstractmethod + def close(self) -> None: + """Close the progress display.""" + + @abstractmethod + def enable(self) -> None: + """Enable the view.""" + + @abstractmethod + def disable(self) -> None: + """Disable the view.""" + + +class DJProgressDisplayAdapter(ProgessDisplay): + """DataJoint-specific adapter for the progress display.""" + + def __init__(self, translator: IdentificationTranslator, display: ProgressView) -> None: + """Initialize the display.""" + self._translator = translator + self._display = display + + def start(self, process: Processes, to_be_processed: Iterable[Identifier]) -> None: + """Start showing progress information to the user.""" + self._display.open(process.name, len(list(to_be_processed)), "row") + + def update_current(self, new: Identifier) -> None: + """Update the display to reflect a new entity being currently processed.""" + self._display.update_current(repr(self._translator.to_primary_key(new))) + + def finish_current(self) -> None: + """Update the display to reflect that the current entity finished processing.""" + self._display.update_iteration() + + def stop(self) -> None: + """Stop showing progress information to the user.""" + self._display.close() From 1d19a459a8772e4d2972703527a0af191a4d2bd9 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:14:18 +0100 Subject: [PATCH 08/14] Add tqdm based progress bar --- link/infrastructure/progress.py | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 link/infrastructure/progress.py diff --git a/link/infrastructure/progress.py b/link/infrastructure/progress.py new file mode 100644 index 0000000..9b4bef6 --- /dev/null +++ b/link/infrastructure/progress.py @@ -0,0 +1,50 @@ +"""Contains views for showing progress information to the user.""" +from __future__ import annotations + +import logging +from typing import NoReturn + +from tqdm.auto import tqdm + +from link.adapters.progress import ProgressView + +logger = logging.getLogger(__name__) + + +class TQDMProgressView(ProgressView): + """A view that uses tqdm to show a progress bar.""" + + def __init__(self) -> None: + """Initialize the view.""" + self.__progress_bar: tqdm[NoReturn] | None = None + self._is_disabled: bool = False + + @property + def _progress_bar(self) -> tqdm[NoReturn]: + assert self.__progress_bar + return self.__progress_bar + + def open(self, description: str, total: int, unit: str) -> None: + """Start showing the progress bar.""" + self.__progress_bar = tqdm(total=total, desc=description, unit=unit, disable=self._is_disabled) + + def update_current(self, new: str) -> None: + """Update information about the current iteration shown at the end of the bar.""" + self._progress_bar.set_postfix(current=new) + + def update_iteration(self) -> None: + """Update the bar to show an iteration finished.""" + self._progress_bar.update() + + def close(self) -> None: + """Stop showing the progress bar.""" + self._progress_bar.close() + self.__progress_bar = None + + def enable(self) -> None: + """Enable the progress bar.""" + self._is_disabled = False + + def disable(self) -> None: + """Disable the progress bar.""" + self._is_disabled = True From bd7662ba2238d79f563ea2384e72002633c18df3 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:14:54 +0100 Subject: [PATCH 09/14] Use new progress bar --- link/infrastructure/link.py | 27 +++++++++++++++++++++------ link/infrastructure/mixin.py | 22 ++++++++++++++++++---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/link/infrastructure/link.py b/link/infrastructure/link.py index 317b75a..c18088d 100644 --- a/link/infrastructure/link.py +++ b/link/infrastructure/link.py @@ -11,14 +11,27 @@ from link.adapters.gateway import DJLinkGateway from link.adapters.identification import IdentificationTranslator from link.adapters.present import create_idle_entities_updater, create_state_change_logger +from link.adapters.progress import DJProgressDisplayAdapter from link.domain import commands, events -from link.service.handlers import delete, delete_entity, list_idle_entities, log_state_change, pull, pull_entity +from link.service.handlers import ( + delete, + delete_entity, + inform_of_finished_process, + inform_of_started_process, + list_idle_entities, + log_state_change, + pull, + pull_entity, + start_displaying_progress, + stop_displaying_progress, +) from link.service.messagebus import CommandHandlers, EventHandlers, MessageBus from link.service.uow import UnitOfWork from . import DJConfiguration, create_tables from .facade import DJLinkFacade from .mixin import create_local_endpoint +from .progress import TQDMProgressView from .sequence import IterationCallbackList, create_content_replacer @@ -59,10 +72,12 @@ def inner(obj: type) -> Any: command_handlers[commands.ListIdleEntities] = partial( list_idle_entities, uow=uow, output_port=idle_entities_updater ) - event_handlers[events.ProcessStarted] = [lambda event: None] - event_handlers[events.ProcessFinished] = [lambda event: None] - event_handlers[events.ProcessesStarted] = [lambda event: None] - event_handlers[events.ProcessesFinished] = [lambda event: None] + progress_view = TQDMProgressView() + display = DJProgressDisplayAdapter(translator, progress_view) + event_handlers[events.ProcessStarted] = [partial(inform_of_started_process, display=display)] + event_handlers[events.ProcessFinished] = [partial(inform_of_finished_process, display=display)] + event_handlers[events.ProcessesStarted] = [partial(start_displaying_progress, display=display)] + event_handlers[events.ProcessesFinished] = [partial(stop_displaying_progress, display=display)] event_handlers[events.StateChanged] = [ partial(log_state_change, log=create_state_change_logger(translator, logger.info)) ] @@ -71,6 +86,6 @@ def inner(obj: type) -> Any: controller = DJController(bus, translator) source_restriction.callback = controller.list_idle_entities - return create_local_endpoint(controller, tables, source_restriction) + return create_local_endpoint(controller, tables, source_restriction, progress_view) return inner diff --git a/link/infrastructure/mixin.py b/link/infrastructure/mixin.py index f6e756f..01a3609 100644 --- a/link/infrastructure/mixin.py +++ b/link/infrastructure/mixin.py @@ -8,6 +8,7 @@ from link.adapters.controller import DJController from link.adapters.custom_types import PrimaryKey +from link.adapters.progress import ProgressView from . import DJTables @@ -17,11 +18,15 @@ class SourceEndpoint(Table): _controller: DJController _outbound_table: Callable[[], Table] + _progress_view: ProgressView - def pull(self) -> None: + def pull(self, *, display_progress: bool = False) -> None: """Pull idle entities from the source table into the local table.""" + if display_progress: + self._progress_view.enable() primary_keys = self.proj().fetch(as_dict=True) self._controller.pull(primary_keys) + self._progress_view.disable() @property def flagged(self) -> Sequence[PrimaryKey]: @@ -34,6 +39,7 @@ def create_source_endpoint_factory( source_table: Callable[[], Table], outbound_table: Callable[[], Table], restriction: Iterable[PrimaryKey], + progress_view: ProgressView, ) -> Callable[[], SourceEndpoint]: """Create a callable that returns the source endpoint when called.""" @@ -47,6 +53,7 @@ def create_source_endpoint() -> SourceEndpoint: { "_controller": controller, "_outbound_table": staticmethod(outbound_table), + "_progress_view": progress_view, }, )() & restriction, @@ -60,11 +67,15 @@ class LocalEndpoint(Table): _controller: DJController _source: Callable[[], SourceEndpoint] + _progress_view: ProgressView - def delete(self) -> None: + def delete(self, *, display_progress: bool = False) -> None: """Delete pulled entities from the local table.""" + if display_progress: + self._progress_view.enable() primary_keys = self.proj().fetch(as_dict=True) self._controller.delete(primary_keys) + self._progress_view.disable() @property def source(self) -> SourceEndpoint: @@ -73,7 +84,7 @@ def source(self) -> SourceEndpoint: def create_local_endpoint( - controller: DJController, tables: DJTables, source_restriction: Iterable[PrimaryKey] + controller: DJController, tables: DJTables, source_restriction: Iterable[PrimaryKey], progress_view: ProgressView ) -> type[LocalEndpoint]: """Create the local endpoint.""" return cast( @@ -87,8 +98,11 @@ def create_local_endpoint( { "_controller": controller, "_source": staticmethod( - create_source_endpoint_factory(controller, tables.source, tables.outbound, source_restriction) + create_source_endpoint_factory( + controller, tables.source, tables.outbound, source_restriction, progress_view + ), ), + "_progress_view": progress_view, }, ), ) From c0361e56c7c9d412445671c6fd498671202a1bbc Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:15:20 +0100 Subject: [PATCH 10/14] Enable progress bar in pull test --- tests/functional/test_pulling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/test_pulling.py b/tests/functional/test_pulling.py index 2b5598b..4556bee 100644 --- a/tests/functional/test_pulling.py +++ b/tests/functional/test_pulling.py @@ -75,7 +75,7 @@ def create_random_table_name(): "Outbound", schema_names["local"], )(type(source_table_name, (dj.Manual,), {})) - (local_table_cls().source & [{"foo": 1}, {"foo": 2}]).pull() + (local_table_cls().source & [{"foo": 1}, {"foo": 2}]).pull(display_progress=True) actual = local_table_cls().fetch(as_dict=True, download_path=tmpdir) assert len(actual) == len(expected) assert all(entry in expected for entry in actual) From ef1b25d49e9b7cfac141533c3428f1214223013f Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:19:53 +0100 Subject: [PATCH 11/14] Add hint for progress bar to README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9258360..bf84650 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ Table().source All the rows can be pulled like so: ```python -Table().source.pull() +Table().source.pull() # Hint: Pass display_progress=True to get a progress bar ``` That said usually we only want to pull rows that match a certain criteria: From 3e09b9d958aea16f8b9a86dc8830408d7286b772 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:23:17 +0100 Subject: [PATCH 12/14] Rename batch processing events --- link/domain/events.py | 8 ++++---- link/infrastructure/link.py | 4 ++-- link/service/handlers.py | 12 ++++++------ tests/integration/test_services.py | 8 ++++---- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/link/domain/events.py b/link/domain/events.py index 5056e12..9393f9c 100644 --- a/link/domain/events.py +++ b/link/domain/events.py @@ -62,16 +62,16 @@ class ProcessFinished(Event): @dataclass(frozen=True) -class ProcessesStarted(Event): - """The same process has been started for multiple entities.""" +class BatchProcessingStarted(Event): + """The processing of a batch of entities started.""" process: Processes identifiers: frozenset[Identifier] @dataclass(frozen=True) -class ProcessesFinished(Event): - """The same process has been finished for multiple entities.""" +class BatchProcessingFinished(Event): + """The processing of a batch of entities finished.""" process: Processes identifiers: frozenset[Identifier] diff --git a/link/infrastructure/link.py b/link/infrastructure/link.py index c18088d..9e9cbb0 100644 --- a/link/infrastructure/link.py +++ b/link/infrastructure/link.py @@ -76,8 +76,8 @@ def inner(obj: type) -> Any: display = DJProgressDisplayAdapter(translator, progress_view) event_handlers[events.ProcessStarted] = [partial(inform_of_started_process, display=display)] event_handlers[events.ProcessFinished] = [partial(inform_of_finished_process, display=display)] - event_handlers[events.ProcessesStarted] = [partial(start_displaying_progress, display=display)] - event_handlers[events.ProcessesFinished] = [partial(stop_displaying_progress, display=display)] + event_handlers[events.BatchProcessingStarted] = [partial(start_displaying_progress, display=display)] + event_handlers[events.BatchProcessingFinished] = [partial(stop_displaying_progress, display=display)] event_handlers[events.StateChanged] = [ partial(log_state_change, log=create_state_change_logger(translator, logger.info)) ] diff --git a/link/service/handlers.py b/link/service/handlers.py index f6b82d9..4218e84 100644 --- a/link/service/handlers.py +++ b/link/service/handlers.py @@ -31,18 +31,18 @@ def delete_entity(command: commands.DeleteEntity, *, uow: UnitOfWork, message_bu def pull(command: commands.PullEntities, *, message_bus: MessageBus) -> None: """Pull entities across the link.""" - message_bus.handle(events.ProcessesStarted(Processes.PULL, command.requested)) + message_bus.handle(events.BatchProcessingStarted(Processes.PULL, command.requested)) for identifier in command.requested: message_bus.handle(commands.PullEntity(identifier)) - message_bus.handle(events.ProcessesFinished(Processes.PULL, command.requested)) + message_bus.handle(events.BatchProcessingFinished(Processes.PULL, command.requested)) def delete(command: commands.DeleteEntities, *, message_bus: MessageBus) -> None: """Delete pulled entities.""" - message_bus.handle(events.ProcessesStarted(Processes.DELETE, command.requested)) + message_bus.handle(events.BatchProcessingStarted(Processes.DELETE, command.requested)) for identifier in command.requested: message_bus.handle(commands.DeleteEntity(identifier)) - message_bus.handle(events.ProcessesFinished(Processes.DELETE, command.requested)) + message_bus.handle(events.BatchProcessingFinished(Processes.DELETE, command.requested)) def list_idle_entities( @@ -62,7 +62,7 @@ def log_state_change(event: events.StateChanged, log: Callable[[events.StateChan log(event) -def start_displaying_progress(event: events.ProcessesStarted, *, display: ProgessDisplay) -> None: +def start_displaying_progress(event: events.BatchProcessingStarted, *, display: ProgessDisplay) -> None: """Start displaying progress to the user.""" display.start(event.process, event.identifiers) @@ -77,6 +77,6 @@ def inform_of_finished_process(event: events.ProcessFinished, *, display: Proges display.finish_current() -def stop_displaying_progress(event: events.ProcessesFinished, *, display: ProgessDisplay) -> None: +def stop_displaying_progress(event: events.BatchProcessingFinished, *, display: ProgessDisplay) -> None: """Stop displaying progress to the user.""" display.stop() diff --git a/tests/integration/test_services.py b/tests/integration/test_services.py index 2f97c0f..5aedfaf 100644 --- a/tests/integration/test_services.py +++ b/tests/integration/test_services.py @@ -82,8 +82,8 @@ def create_pull_service(uow: UnitOfWork) -> Callable[[commands.PullEntities], No event_handlers[events.StateChanged] = [lambda event: None] event_handlers[events.ProcessStarted] = [lambda event: None] event_handlers[events.ProcessFinished] = [lambda event: None] - event_handlers[events.ProcessesStarted] = [lambda event: None] - event_handlers[events.ProcessesFinished] = [lambda event: None] + event_handlers[events.BatchProcessingStarted] = [lambda event: None] + event_handlers[events.BatchProcessingFinished] = [lambda event: None] return partial(pull, message_bus=bus) @@ -96,8 +96,8 @@ def create_delete_service(uow: UnitOfWork) -> Callable[[commands.DeleteEntities] event_handlers[events.StateChanged] = [lambda event: None] event_handlers[events.ProcessStarted] = [lambda event: None] event_handlers[events.ProcessFinished] = [lambda event: None] - event_handlers[events.ProcessesStarted] = [lambda event: None] - event_handlers[events.ProcessesFinished] = [lambda event: None] + event_handlers[events.BatchProcessingStarted] = [lambda event: None] + event_handlers[events.BatchProcessingFinished] = [lambda event: None] return partial(delete, message_bus=bus) From 0c9f0e413bc132783e6c0885a8e833f737382448 Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:34:28 +0100 Subject: [PATCH 13/14] Rename new handlers --- link/infrastructure/link.py | 16 ++++++++-------- link/service/handlers.py | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/link/infrastructure/link.py b/link/infrastructure/link.py index 9e9cbb0..aa9fbd6 100644 --- a/link/infrastructure/link.py +++ b/link/infrastructure/link.py @@ -16,14 +16,14 @@ from link.service.handlers import ( delete, delete_entity, - inform_of_finished_process, - inform_of_started_process, + inform_batch_processing_finished, + inform_batch_processing_started, + inform_current_process_finished, + inform_next_process_started, list_idle_entities, log_state_change, pull, pull_entity, - start_displaying_progress, - stop_displaying_progress, ) from link.service.messagebus import CommandHandlers, EventHandlers, MessageBus from link.service.uow import UnitOfWork @@ -74,10 +74,10 @@ def inner(obj: type) -> Any: ) progress_view = TQDMProgressView() display = DJProgressDisplayAdapter(translator, progress_view) - event_handlers[events.ProcessStarted] = [partial(inform_of_started_process, display=display)] - event_handlers[events.ProcessFinished] = [partial(inform_of_finished_process, display=display)] - event_handlers[events.BatchProcessingStarted] = [partial(start_displaying_progress, display=display)] - event_handlers[events.BatchProcessingFinished] = [partial(stop_displaying_progress, display=display)] + event_handlers[events.ProcessStarted] = [partial(inform_next_process_started, display=display)] + event_handlers[events.ProcessFinished] = [partial(inform_current_process_finished, display=display)] + event_handlers[events.BatchProcessingStarted] = [partial(inform_batch_processing_started, display=display)] + event_handlers[events.BatchProcessingFinished] = [partial(inform_batch_processing_finished, display=display)] event_handlers[events.StateChanged] = [ partial(log_state_change, log=create_state_change_logger(translator, logger.info)) ] diff --git a/link/service/handlers.py b/link/service/handlers.py index 4218e84..9442672 100644 --- a/link/service/handlers.py +++ b/link/service/handlers.py @@ -62,21 +62,21 @@ def log_state_change(event: events.StateChanged, log: Callable[[events.StateChan log(event) -def start_displaying_progress(event: events.BatchProcessingStarted, *, display: ProgessDisplay) -> None: - """Start displaying progress to the user.""" +def inform_batch_processing_started(event: events.BatchProcessingStarted, *, display: ProgessDisplay) -> None: + """Inform the user that batch processing started.""" display.start(event.process, event.identifiers) -def inform_of_started_process(event: events.ProcessStarted, *, display: ProgessDisplay) -> None: - """Update the display with the entity whose process started.""" +def inform_next_process_started(event: events.ProcessStarted, *, display: ProgessDisplay) -> None: + """Inform the user that the next entity started processing.""" display.update_current(event.identifier) -def inform_of_finished_process(event: events.ProcessFinished, *, display: ProgessDisplay) -> None: - """Inform the user of an entity finishing its process.""" +def inform_current_process_finished(event: events.ProcessFinished, *, display: ProgessDisplay) -> None: + """Inform the user that the current entity finished processing.""" display.finish_current() -def stop_displaying_progress(event: events.BatchProcessingFinished, *, display: ProgessDisplay) -> None: - """Stop displaying progress to the user.""" +def inform_batch_processing_finished(event: events.BatchProcessingFinished, *, display: ProgessDisplay) -> None: + """Inform the user that batch processing finished.""" display.stop() From df2167b4e15ef2b40217f7a9ea0c1f96473e5bfa Mon Sep 17 00:00:00 2001 From: Christoph Blessing <33834216+cblessing24@users.noreply.github.com> Date: Thu, 9 Nov 2023 15:41:53 +0100 Subject: [PATCH 14/14] Display progress in deletion test --- tests/functional/test_deleting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/test_deleting.py b/tests/functional/test_deleting.py index 1b12b4a..f009f35 100644 --- a/tests/functional/test_deleting.py +++ b/tests/functional/test_deleting.py @@ -32,7 +32,7 @@ def test_deleting(prepare_link, prepare_table, act_as, create_table): outbound_table_cls().insert1(row) with act_as(actors["local"]): - (local_table_cls() & local_table_cls().source.flagged).delete() + (local_table_cls() & local_table_cls().source.flagged).delete(display_progress=True) assert local_table_cls().fetch(as_dict=True) == expected assert (outbound_table_cls() & {"foo": 1}).fetch1("is_deprecated") == "TRUE"