From d5504f3716eb3e1ee7fc0332992bdede598d07ae Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Mon, 2 Dec 2024 17:33:34 +0100
Subject: [PATCH] fix: no weakrefs so that Records survive long enough to be
persisted.
---
dabapush/Reader/NDJSONReader.py | 4 +---
dabapush/Record.py | 25 ++++++++-----------------
tests/test_Record.py | 4 ++--
3 files changed, 11 insertions(+), 22 deletions(-)
diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py
index b71a551..32d42ae 100644
--- a/dabapush/Reader/NDJSONReader.py
+++ b/dabapush/Reader/NDJSONReader.py
@@ -1,7 +1,5 @@
"""NDJSON Writer plug-in for dabapush"""
-import weakref
-
# pylint: disable=R,I1101
from typing import Iterator, List
@@ -27,7 +25,7 @@ def read_and_split(
if not flatten_records
else flatten(ujson.loads(line))
),
- source=weakref.ref(record),
+ source=record,
)
for line_number, line in enumerate(file)
]
diff --git a/dabapush/Record.py b/dabapush/Record.py
index 04a1801..0e48a7b 100644
--- a/dabapush/Record.py
+++ b/dabapush/Record.py
@@ -3,7 +3,6 @@
"""
import dataclasses
-import weakref
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
from uuid import uuid4
@@ -56,7 +55,7 @@ class Record:
"""
payload: Optional[Any] = None
- source: Optional[weakref.ReferenceType] = None
+ source: Optional[Self] = None
uuid: Optional[str] = uuid4().hex
processed_at: datetime = datetime.now()
children: List[Self] = dataclasses.field(default_factory=list)
@@ -118,7 +117,7 @@ def _handle_key_split_(self, id_key, key):
**{
"payload": value,
"uuid": value.get(id_key) if id_key else uuid4().hex,
- "source": weakref.ref(self),
+ "source": self,
}
)
for value in self.payload[key]
@@ -129,19 +128,15 @@ def _handle_key_split_(self, id_key, key):
def to_log(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
"""Return a loggable representation of the record."""
log.debug(f"Logging record {self.uuid}.")
- if self.source:
- source = self.source()
- if not source:
- log.critical(f"Source of record {self.uuid} is not available")
- raise ValueError(f"Source of record {self.uuid} is not available")
- else:
- source = None
+
return {
"uuid": str(self.uuid),
"processed_at": self.processed_at.isoformat(),
# We cannot allow the source to be a Record, as it would create a circular reference
# while serializing the dataclass to JSON.
- "source": (source if not isinstance(source, Record) else source.uuid),
+ "source": (
+ self.source if not isinstance(self.source, Record) else self.source.uuid
+ ),
"children": [child.to_log() for child in self.children],
}
@@ -167,12 +162,8 @@ def done(self):
self.state = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
- parent: Record = self.source()
- if not parent:
- log.critical(f"Source of record {self.uuid} is not available")
- raise ValueError(f"Source of record {self.uuid} is not available")
- parent.signal_done()
- log.debug(f"Signaled parent {parent.uuid} of record {self.uuid}.")
+ self.source.signal_done()
+ log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
self.__dispatch_event__("on_done")
def signal_done(self):
diff --git a/tests/test_Record.py b/tests/test_Record.py
index 493dade..ec69db9 100644
--- a/tests/test_Record.py
+++ b/tests/test_Record.py
@@ -41,7 +41,7 @@ def test_splitting_record():
assert _record_.uuid
assert _record_.processed_at
assert _record_.payload == {"key": "value"}
- assert _record_.source() == record
+ assert _record_.source == record
assert _record_ in record.children
@@ -62,7 +62,7 @@ def test_splitting_record_with_children_ids():
assert _record_.uuid == n
assert _record_.processed_at
assert _record_.payload == {"key": "value", "id": n}
- assert _record_.source() is record
+ assert _record_.source is record
assert _record_ in record.children