Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Exists function in DeltaTableStep should not log error #70

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/koheesio/spark/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,25 @@ def has_change_type(self) -> bool:

@property
def exists(self) -> bool:
"""Check if table exists"""
"""Check if table exists.
Depending on the value of the boolean flag `create_if_not_exists` a different logging level is provided."""
result = False

try:
self.spark.table(self.table_name)
result = True
except AnalysisException as e:
err_msg = str(e).lower()
common_message = (
f"Table `{self.table}` doesn't exist. "
f"The `create_if_not_exists` flag is set to {self.create_if_not_exists}."
)

if err_msg.startswith("[table_or_view_not_found]") or err_msg.startswith("table or view not found"):
self.log.error(f"Table `{self.table}` doesn't exist.")
if self.create_if_not_exists:
self.log.info(" ".join((common_message, "Therefore the table will be created.")))
else:
self.log.error(" ".join((common_message, "Therefore the table will not be created.")))
else:
raise e

Expand Down
16 changes: 14 additions & 2 deletions tests/spark/test_delta.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import logging
from pathlib import Path
from unittest.mock import patch

Expand All @@ -14,7 +15,7 @@

pytestmark = pytest.mark.spark

log = LoggingFactory.get_logger(name="test_delta")
log = LoggingFactory.get_logger(name="test_delta", inherit_from_koheesio=True)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -138,8 +139,19 @@ def test_delta_table_properties_dbx():

@pytest.mark.parametrize("value,expected", [("too.many.dots.given.to.be.a.real.table", pytest.raises(ValidationError))])
def test_table_failed(value, expected):
with pytest.raises(ValidationError):
with expected:
DeltaTableStep(table=value)

dt = DeltaTableStep(table="unknown_table")
assert dt.exists is False


@pytest.mark.parametrize(
["table", "create_if_not_exists", "log_level"], [("unknown", False, "DEBUG"), ("unknown", True, "INFO")]
)
def test_exists(caplog, table, create_if_not_exists, log_level):
with caplog.at_level(log_level):
dt = DeltaTableStep(table=table, create_if_not_exists=create_if_not_exists)
dt.log.setLevel(log_level)
assert dt.exists is False
assert f"The `create_if_not_exists` flag is set to {create_if_not_exists}." in caplog.text