Skip to content

Commit fcafdde

Browse files
authoredMar 23, 2025··
Merge pull request #52 from IFRCGo/feature/partial-success-summary
Add support for partial transform success
2 parents dffe06c + 3d93902 commit fcafdde

File tree

3 files changed

+47
-18
lines changed

3 files changed

+47
-18
lines changed
 

‎pystac_monty/sources/common.py

+36
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,36 @@
99
from pystac_monty.geocoding import MontyGeoCoder
1010

1111

12+
class TransformSummaryInProgressException(Exception): ...
13+
14+
15+
@dataclass
16+
class TransformSummary:
17+
total_rows: int = 0
18+
failed_rows: int = 0
19+
is_completed: bool = False
20+
21+
def increment_rows(self, increment=1):
22+
self.total_rows += increment
23+
24+
def increment_failed_rows(self, increment=1):
25+
self.failed_rows += increment
26+
27+
def mark_as_complete(self):
28+
self.is_completed = True
29+
30+
def mark_as_started(self):
31+
self.is_completed = False
32+
self.total_rows = 0
33+
self.failed_rows = 0
34+
35+
@property
36+
def success_rows(self) -> int:
37+
if not self.is_completed:
38+
raise TransformSummaryInProgressException()
39+
return self.total_rows - self.failed_rows
40+
41+
1242
@dataclass
1343
class MontyDataSource:
1444
source_url: str
@@ -60,6 +90,8 @@ def __init__(self, data_source: DataSource, geocoder: MontyGeoCoder):
6090

6191
self.geocoder = geocoder
6292

93+
self.transform_summary = TransformSummary()
94+
6395
def get_event_collection(self) -> Collection:
6496
"""Get event collection"""
6597
if self._event_collection_cache is None:
@@ -108,5 +140,9 @@ def get_impact_collection(self) -> Collection:
108140
self._impact_collection_cache = collection
109141
return self._impact_collection_cache
110142

143+
# FIXME: This method is deprecated
111144
@abc.abstractmethod
112145
def make_items(self) -> list[Item]: ...
146+
147+
@abc.abstractmethod
148+
def get_stac_items(self) -> typing.Generator[Item, None, None]: ...

‎pystac_monty/sources/desinventar.py

+5-9
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,6 @@ def _generate_geo_data_mapping(self, root: etree._Element) -> Dict[str, GeoDataE
556556
"shapefile_data": shapefile_data
557557
}
558558

559-
print(geo_data)
560559
return geo_data
561560

562561
@staticmethod
@@ -583,11 +582,9 @@ def get_stac_items(self) -> typing.Generator[Item, None, None]:
583582

584583
events = root.xpath("//fichas/TR")
585584

586-
failed_items_count = 0
587-
total_items_count = 0
588-
585+
self.transform_summary.mark_as_started()
589586
for event_row in events:
590-
total_items_count += 1
587+
self.transform_summary.increment_rows()
591588
try:
592589
if row_data := parse_row_data(
593590
event_row,
@@ -599,12 +596,11 @@ def get_stac_items(self) -> typing.Generator[Item, None, None]:
599596
yield event_item
600597
yield from self._create_impact_items_from_row(row_data, event_item)
601598
else:
602-
failed_items_count += 1
599+
self.transform_summary.increment_failed_rows()
603600
except Exception:
604-
failed_items_count += 1
601+
self.transform_summary.increment_failed_rows()
605602
logger.error('Failed to process desinventar', exc_info=True)
606-
607-
print(failed_items_count)
603+
self.transform_summary.mark_as_complete()
608604

609605
# FIXME: This is deprecated
610606
def make_items(self):

‎pystac_monty/sources/ibtracs.py

+6-9
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,13 @@ class IBTrACSTransformer(MontyDataTransformer[IBTrACSDataSource]):
5959
source_name = 'ibtracs'
6060

6161
def get_stac_items(self) -> typing.Generator[Item, None, None]:
62-
# # TODO: Use sax xml parser for memory efficient usage
63-
failed_items_count = 0
64-
total_items_count = 0
65-
62+
self.transform_summary.mark_as_started()
63+
# TODO: Use sax xml parser for memory efficient usage
6664
csv_data = self.data_source._parse_csv()
6765
csv_data.sort(key=lambda x: x.get("SID", " "))
6866
for storm_id, storm_data_iterator in itertools.groupby(csv_data, key=lambda x: x.get("SID", " ")):
6967
storm_data = list(storm_data_iterator)
70-
total_items_count += len(storm_data)
68+
self.transform_summary.increment_rows(len(storm_data))
7169

7270
try:
7371
def parse_row_data(rows: list[dict]):
@@ -82,12 +80,11 @@ def parse_row_data(rows: list[dict]):
8280
yield event_item
8381
yield from self.make_hazard_items(event_item, storm_data)
8482
else:
85-
failed_items_count += len(storm_data)
83+
self.transform_summary.increment_failed_rows(len(storm_data))
8684
except Exception:
87-
failed_items_count += len(storm_data)
85+
self.transform_summary.increment_failed_rows(len(storm_data))
8886
logger.error("Failed to process ibtracs", exc_info=True)
89-
90-
print(failed_items_count)
87+
self.transform_summary.mark_as_complete()
9188

9289
# FIXME: This is deprecated
9390
def make_items(self):

0 commit comments

Comments
 (0)
Please sign in to comment.