Skip to content

Commit

Permalink
Refactor data source utilities and exporters for improved error handl…
Browse files Browse the repository at this point in the history
…ing and optional parameter management (mypy fixes)

- Updated DataSourceUtil to check for statement type when accessing memstore.
- Modified CSVExporter to handle empty fieldnames and added a check for buffer file existence before writing.
- Enhanced UnifiedBufferedExporter with better buffer file initialization and metadata update logic, including retry mechanisms.
- Improved XMLExporter to ensure buffer file checks are in place before writing data.

These changes enhance the robustness and maintainability of the data export process.
  • Loading branch information
ake2l committed Dec 23, 2024
1 parent 32958a5 commit 95b6d78
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 28 deletions.
2 changes: 1 addition & 1 deletion datamimic_ce/data_sources/data_source_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def set_data_source_length(
elif source_str.endswith(".xml"):
ds_len = len(list(ET.parse(ctx.descriptor_dir / source_str).getroot()))
# 2.4: Check if datasource is memstore
elif ctx.memstore_manager.contain(source_str):
elif ctx.memstore_manager.contain(source_str) and hasattr(stmt, "type"):
ds_len = ctx.memstore_manager.get_memstore(source_str).get_data_len_by_type(stmt.type or stmt.name)
elif ctx.get_client_by_id(source_str) is not None:
client = ctx.get_client_by_id(source_str)
Expand Down
4 changes: 3 additions & 1 deletion datamimic_ce/exporters/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
encoding: str | None,
):
# Remove singleton pattern and initialize instance variables
self.fieldnames = fieldnames
self.fieldnames = fieldnames or []
self._task_id = setup_context.task_id

# Retrieve encoding and delimiter from setup_context or use defaults
Expand All @@ -58,6 +58,8 @@ def _write_data_to_buffer(self, data: list[dict]) -> None:
"""Writes data to the current buffer file in CSV format."""
try:
buffer_file = self._get_buffer_file()
if buffer_file is None:
return
write_header = not buffer_file.exists()
with buffer_file.open("a", newline="", encoding=self._encoding) as csvfile:
if not self.fieldnames and data:
Expand Down
40 changes: 22 additions & 18 deletions datamimic_ce/exporters/unified_buffered_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ def _get_buffer_file(self) -> Path:
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to initialize buffer file: {e}")
if attempt == self.MAX_RETRIES - 1:
# TODO: mypy issue [return], mypy don't understand this logic
raise BufferFileError(
f"Failed to initialize buffer file after {self.MAX_RETRIES} attempts: {e}"
) from e
time.sleep(self.RETRY_DELAY * (attempt + 1))
return buffer_file

def _rotate_chunk(self) -> None:
"""Finalizes current chunk and creates new one with proper error handling."""
Expand Down Expand Up @@ -259,24 +259,28 @@ def _write_batch_with_retry(self, batch: list[dict]) -> None:

def _update_metadata_file(self) -> None:
"""Updates the metadata file with retry mechanism."""
metadata_file = self._get_buffer_file().with_suffix(".meta")
buffer_file: Path | None = self._get_buffer_file()
if buffer_file:
metadata_file = buffer_file.with_suffix(".meta")

for attempt in range(self.MAX_RETRIES):
try:
with metadata_file.open("r+", encoding=self._encoding) as f:
metadata = json.load(f)
metadata["total_count"] = self.global_counter
metadata["chunk_index"] = self.chunk_index
f.seek(0) # Move to the start of the file to overwrite
json.dump(metadata, f)
f.truncate() # Remove any leftover data from previous writes
logger.debug(f"Updated metadata file {metadata_file} with total_count: {self.global_counter}")
return
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to update metadata: {e}")
if attempt == self.MAX_RETRIES - 1:
raise BufferFileError(f"Failed to update metadata after {self.MAX_RETRIES} attempts: {e}") from e
time.sleep(self.RETRY_DELAY * (attempt + 1))
for attempt in range(self.MAX_RETRIES):
try:
with metadata_file.open("r+", encoding=self._encoding) as f:
metadata = json.load(f)
metadata["total_count"] = self.global_counter
metadata["chunk_index"] = self.chunk_index
f.seek(0) # Move to the start of the file to overwrite
json.dump(metadata, f)
f.truncate() # Remove any leftover data from previous writes
logger.debug(f"Updated metadata file {metadata_file} with total_count: {self.global_counter}")
return
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to update metadata: {e}")
if attempt == self.MAX_RETRIES - 1:
raise BufferFileError(
f"Failed to update metadata after {self.MAX_RETRIES} attempts: {e}"
) from e
time.sleep(self.RETRY_DELAY * (attempt + 1))

@abstractmethod
def _write_data_to_buffer(self, data: list[dict]) -> None:
Expand Down
20 changes: 12 additions & 8 deletions datamimic_ce/exporters/xml_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,20 @@ def _write_data_to_buffer(self, data: list[dict[str, Any]]) -> None:
items_xml += item_xml + "\n" # Add newline for readability

buffer_file = self._get_buffer_file()
# If buffer does not exist or is empty, start with the root element
if not buffer_file.exists() or buffer_file.stat().st_size == 0:
with buffer_file.open("w", encoding=self.encoding) as xmlfile:
xmlfile.write(f"<{self.root_element}>\n")

if buffer_file is None:
return
else:
# If buffer does not exist or is empty, start with the root element
if not buffer_file.exists() or buffer_file.stat().st_size == 0:
with buffer_file.open("w", encoding=self.encoding) as xmlfile:
xmlfile.write(f"<{self.root_element}>\n")
logger.debug(f"Created root element in buffer file: {buffer_file}")

# Append items to the root element
with buffer_file.open("a", encoding=self.encoding) as xmlfile:
xmlfile.write(items_xml)
logger.debug(f"Wrote {len(data)} records to buffer file: {buffer_file}")
# Append items to the root element
with buffer_file.open("a", encoding=self.encoding) as xmlfile:
xmlfile.write(items_xml)
logger.debug(f"Wrote {len(data)} records to buffer file: {buffer_file}")

except Exception as e:
logger.error(f"Error writing data to buffer: {e}")
Expand Down

0 comments on commit 95b6d78

Please sign in to comment.