From d5504f3716eb3e1ee7fc0332992bdede598d07ae Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Mon, 2 Dec 2024 17:33:34 +0100 Subject: [PATCH] fix: no weakrefs so that Records survive long enough to be persisted. --- dabapush/Reader/NDJSONReader.py | 4 +--- dabapush/Record.py | 25 ++++++++----------------- tests/test_Record.py | 4 ++-- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py index b71a551..32d42ae 100644 --- a/dabapush/Reader/NDJSONReader.py +++ b/dabapush/Reader/NDJSONReader.py @@ -1,7 +1,5 @@ """NDJSON Writer plug-in for dabapush""" -import weakref - # pylint: disable=R,I1101 from typing import Iterator, List @@ -27,7 +25,7 @@ def read_and_split( if not flatten_records else flatten(ujson.loads(line)) ), - source=weakref.ref(record), + source=record, ) for line_number, line in enumerate(file) ] diff --git a/dabapush/Record.py b/dabapush/Record.py index 04a1801..0e48a7b 100644 --- a/dabapush/Record.py +++ b/dabapush/Record.py @@ -3,7 +3,6 @@ """ import dataclasses -import weakref from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union from uuid import uuid4 @@ -56,7 +55,7 @@ class Record: """ payload: Optional[Any] = None - source: Optional[weakref.ReferenceType] = None + source: Optional[Self] = None uuid: Optional[str] = uuid4().hex processed_at: datetime = datetime.now() children: List[Self] = dataclasses.field(default_factory=list) @@ -118,7 +117,7 @@ def _handle_key_split_(self, id_key, key): **{ "payload": value, "uuid": value.get(id_key) if id_key else uuid4().hex, - "source": weakref.ref(self), + "source": self, } ) for value in self.payload[key] @@ -129,19 +128,15 @@ def _handle_key_split_(self, id_key, key): def to_log(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]: """Return a loggable representation of the record.""" log.debug(f"Logging record {self.uuid}.") - if self.source: - source = self.source() - if not source: - log.critical(f"Source of record {self.uuid} is not available") - raise ValueError(f"Source of record {self.uuid} is not available") - else: - source = None + return { "uuid": str(self.uuid), "processed_at": self.processed_at.isoformat(), # We cannot allow the source to be a Record, as it would create a circular reference # while serializing the dataclass to JSON. - "source": (source if not isinstance(source, Record) else source.uuid), + "source": ( + self.source if not isinstance(self.source, Record) else self.source.uuid + ), "children": [child.to_log() for child in self.children], } @@ -167,12 +162,8 @@ def done(self): self.state = "done" log.debug(f"Record {self.uuid} is set as done.") if self.source: - parent: Record = self.source() - if not parent: - log.critical(f"Source of record {self.uuid} is not available") - raise ValueError(f"Source of record {self.uuid} is not available") - parent.signal_done() - log.debug(f"Signaled parent {parent.uuid} of record {self.uuid}.") + self.source.signal_done() + log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.") self.__dispatch_event__("on_done") def signal_done(self): diff --git a/tests/test_Record.py b/tests/test_Record.py index 493dade..ec69db9 100644 --- a/tests/test_Record.py +++ b/tests/test_Record.py @@ -41,7 +41,7 @@ def test_splitting_record(): assert _record_.uuid assert _record_.processed_at assert _record_.payload == {"key": "value"} - assert _record_.source() == record + assert _record_.source == record assert _record_ in record.children @@ -62,7 +62,7 @@ def test_splitting_record_with_children_ids(): assert _record_.uuid == n assert _record_.processed_at assert _record_.payload == {"key": "value", "id": n} - assert _record_.source() is record + assert _record_.source is record assert _record_ in record.children