Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: no weakrefs so that Records survive long enough to be persisted. #65

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dabapush/Reader/NDJSONReader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""NDJSON Writer plug-in for dabapush"""

import weakref

# pylint: disable=R,I1101
from typing import Iterator, List

Expand All @@ -27,7 +25,7 @@ def read_and_split(
if not flatten_records
else flatten(ujson.loads(line))
),
source=weakref.ref(record),
source=record,
)
for line_number, line in enumerate(file)
]
Expand Down
25 changes: 8 additions & 17 deletions dabapush/Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import dataclasses
import weakref
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
from uuid import uuid4
Expand Down Expand Up @@ -56,7 +55,7 @@ class Record:
"""

payload: Optional[Any] = None
source: Optional[weakref.ReferenceType] = None
source: Optional[Self] = None
uuid: Optional[str] = uuid4().hex
processed_at: datetime = datetime.now()
children: List[Self] = dataclasses.field(default_factory=list)
Expand Down Expand Up @@ -118,7 +117,7 @@ def _handle_key_split_(self, id_key, key):
**{
"payload": value,
"uuid": value.get(id_key) if id_key else uuid4().hex,
"source": weakref.ref(self),
"source": self,
}
)
for value in self.payload[key]
Expand All @@ -129,19 +128,15 @@ def _handle_key_split_(self, id_key, key):
def to_log(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
"""Return a loggable representation of the record."""
log.debug(f"Logging record {self.uuid}.")
if self.source:
source = self.source()
if not source:
log.critical(f"Source of record {self.uuid} is not available")
raise ValueError(f"Source of record {self.uuid} is not available")
else:
source = None

return {
"uuid": str(self.uuid),
"processed_at": self.processed_at.isoformat(),
# We cannot allow the source to be a Record, as it would create a circular reference
# while serializing the dataclass to JSON.
"source": (source if not isinstance(source, Record) else source.uuid),
"source": (
self.source if not isinstance(self.source, Record) else self.source.uuid
),
"children": [child.to_log() for child in self.children],
}

Expand All @@ -167,12 +162,8 @@ def done(self):
self.state = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
parent: Record = self.source()
if not parent:
log.critical(f"Source of record {self.uuid} is not available")
raise ValueError(f"Source of record {self.uuid} is not available")
parent.signal_done()
log.debug(f"Signaled parent {parent.uuid} of record {self.uuid}.")
self.source.signal_done()
log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
self.__dispatch_event__("on_done")

def signal_done(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_splitting_record():
assert _record_.uuid
assert _record_.processed_at
assert _record_.payload == {"key": "value"}
assert _record_.source() == record
assert _record_.source == record
assert _record_ in record.children


Expand All @@ -62,7 +62,7 @@ def test_splitting_record_with_children_ids():
assert _record_.uuid == n
assert _record_.processed_at
assert _record_.payload == {"key": "value", "id": n}
assert _record_.source() is record
assert _record_.source is record
assert _record_ in record.children


Expand Down
Loading