Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat add total_bytes_processed to report #61

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt_dry_run/models/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ReportNode(BaseModel):
status: DryRunStatus
error_message: Optional[str]
table: Optional[Table]
total_bytes_processed: Optional[int]
linting_status: LintingStatus
linting_errors: List[ReportLintingError]

Expand Down
7 changes: 6 additions & 1 deletion dbt_dry_run/node_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@ def validate_node(self, node: Node) -> Optional[DryRunResult]:
node=node,
table=None,
status=DryRunStatus.FAILURE,
total_bytes_processed=0,
exception=NotCompiledException(
f"Node {node.unique_id} was not compiled"
),
)
else:
return DryRunResult(
node, table=None, status=DryRunStatus.SKIPPED, exception=None
node,
table=None,
status=DryRunStatus.SKIPPED,
total_bytes_processed=0,
exception=None,
)
else:
return None
23 changes: 17 additions & 6 deletions dbt_dry_run/node_runner/incremental_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes
schema_changed = added_fields or removed_fields
table: Optional[Table] = target_table
status = dry_run_result.status
total_bytes_processed = dry_run_result.total_bytes_processed
exception = dry_run_result.exception
if schema_changed:
table = None
Expand All @@ -74,7 +75,11 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes
)
exception = SchemaChangeException(msg)
return DryRunResult(
node=dry_run_result.node, table=table, status=status, exception=exception
node=dry_run_result.node,
table=table,
status=status,
total_bytes_processed=total_bytes_processed,
exception=exception,
)


Expand Down Expand Up @@ -143,11 +148,13 @@ def _verify_merge_type_compatibility(
node, common_field_names, sql_statement
)
sql_statement = self._modify_sql(node, sql_statement_with_merge)
status, model_schema, exception = self._sql_runner.query(sql_statement)
status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
sql_statement
)
if status == DryRunStatus.SUCCESS:
return initial_result
else:
return DryRunResult(node, None, status, exception)
return DryRunResult(node, None, status, total_bytes_processed, exception)

def _modify_sql(self, node: Node, sql_statement: str) -> str:
if node.config.sql_header:
Expand All @@ -172,11 +179,15 @@ def run(self, node: Node) -> DryRunResult:
try:
sql_with_literals = insert_dependant_sql_literals(node, self._results)
except UpstreamFailedException as e:
return DryRunResult(node, None, DryRunStatus.FAILURE, e)
return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)
run_sql = self._modify_sql(node, sql_with_literals)
status, model_schema, exception = self._sql_runner.query(run_sql)
status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
run_sql
)

result = DryRunResult(node, model_schema, status, exception)
result = DryRunResult(
node, model_schema, status, total_bytes_processed, exception
)

full_refresh = self._get_full_refresh_config(node)

Expand Down
13 changes: 10 additions & 3 deletions dbt_dry_run/node_runner/node_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@ def run(self, node: Node) -> DryRunResult:
try:
run_sql = insert_dependant_sql_literals(node, self._results)
except UpstreamFailedException as e:
return DryRunResult(node, None, DryRunStatus.FAILURE, e)
return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)

status, predicted_table, exception = self._sql_runner.query(run_sql)
(
status,
predicted_table,
total_bytes_processed,
exception,
) = self._sql_runner.query(run_sql)

result = DryRunResult(node, predicted_table, status, exception)
result = DryRunResult(
node, predicted_table, status, total_bytes_processed, exception
)
return result
7 changes: 6 additions & 1 deletion dbt_dry_run/node_runner/seed_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def run(self, node: Node) -> DryRunResult:
node=node,
table=None,
status=DryRunStatus.FAILURE,
total_bytes_processed=0,
exception=exception,
)
new_field = TableField(
Expand All @@ -40,7 +41,11 @@ def run(self, node: Node) -> DryRunResult:

schema = Table(fields=fields)
return DryRunResult(
node=node, table=schema, status=DryRunStatus.SUCCESS, exception=None
node=node,
table=schema,
status=DryRunStatus.SUCCESS,
total_bytes_processed=0,
exception=None,
)

def validate_node(self, node: Node) -> Optional[DryRunResult]:
Expand Down
16 changes: 13 additions & 3 deletions dbt_dry_run/node_runner/snapshot_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult:
node=result.node,
table=result.table,
status=DryRunStatus.FAILURE,
total_bytes_processed=0,
exception=exception,
)
if node.config.strategy == "timestamp":
Expand All @@ -67,6 +68,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult:
node=result.node,
table=result.table,
status=DryRunStatus.FAILURE,
total_bytes_processed=0,
exception=exception,
)
elif node.config.strategy == "check":
Expand All @@ -78,6 +80,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult:
node=result.node,
table=result.table,
status=DryRunStatus.FAILURE,
total_bytes_processed=0,
exception=exception,
)
else:
Expand All @@ -88,10 +91,17 @@ def run(self, node: Node) -> DryRunResult:
try:
run_sql = insert_dependant_sql_literals(node, self._results)
except UpstreamFailedException as e:
return DryRunResult(node, None, DryRunStatus.FAILURE, e)
return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)

status, predicted_table, exception = self._sql_runner.query(run_sql)
result = DryRunResult(node, predicted_table, status, exception)
(
status,
predicted_table,
total_bytes_processed,
exception,
) = self._sql_runner.query(run_sql)
result = DryRunResult(
node, predicted_table, status, total_bytes_processed, exception
)
if result.status == DryRunStatus.SUCCESS and result.table:
result.table.fields = [*result.table.fields, *DBT_SNAPSHOT_FIELDS]
result = self._validate_snapshot_config(node, result)
Expand Down
5 changes: 4 additions & 1 deletion dbt_dry_run/node_runner/source_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class SourceRunner(NodeRunner):
def run(self, node: Node) -> DryRunResult:
exception: Optional[Exception] = None
predicted_table: Optional[Table] = None
total_bytes_processed: Optional[int] = 0
status = DryRunStatus.SUCCESS
if node.is_external_source():
external_config = cast(ExternalConfig, node.external)
Expand All @@ -37,7 +38,9 @@ def run(self, node: Node) -> DryRunResult:
f"Could not find source in target environment for node '{node.unique_id}'"
)

return DryRunResult(node, predicted_table, status, exception)
return DryRunResult(
node, predicted_table, status, total_bytes_processed, exception
)

def validate_node(self, node: Node) -> Optional[DryRunResult]:
return None
10 changes: 7 additions & 3 deletions dbt_dry_run/node_runner/table_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ def run(self, node: Node) -> DryRunResult:
try:
run_sql = insert_dependant_sql_literals(node, self._results)
except UpstreamFailedException as e:
return DryRunResult(node, None, DryRunStatus.FAILURE, e)
return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)

run_sql = self._modify_sql(node, run_sql)
status, model_schema, exception = self._sql_runner.query(run_sql)
status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
run_sql
)

result = DryRunResult(node, model_schema, status, exception)
result = DryRunResult(
node, model_schema, status, total_bytes_processed, exception
)
return result
10 changes: 7 additions & 3 deletions dbt_dry_run/node_runner/view_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ def run(self, node: Node) -> DryRunResult:
try:
run_sql = insert_dependant_sql_literals(node, self._results)
except UpstreamFailedException as e:
return DryRunResult(node, None, DryRunStatus.FAILURE, e)
return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)

run_sql = self._modify_sql(node, run_sql)
status, model_schema, exception = self._sql_runner.query(run_sql)
status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
run_sql
)

result = DryRunResult(node, model_schema, status, exception)
result = DryRunResult(
node, model_schema, status, total_bytes_processed, exception
)
return result
1 change: 1 addition & 0 deletions dbt_dry_run/result_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def get_report(self) -> Report:
status=result.status,
error_message=exception_type,
table=result.table,
total_bytes_processed=result.total_bytes_processed,
linting_status=result.linting_status,
linting_errors=_map_column_errors(result.linting_errors),
)
Expand Down
8 changes: 7 additions & 1 deletion dbt_dry_run/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ class DryRunResult:
node: Node
table: Optional[Table]
status: DryRunStatus
total_bytes_processed: Optional[int]
exception: Optional[Exception]
linting_status: LintingStatus = LintingStatus.SKIPPED
linting_errors: List[LintingError] = field(default_factory=lambda: [])

def replace_table(self, table: Table) -> "DryRunResult":
return DryRunResult(
node=self.node, table=table, status=self.status, exception=self.exception
node=self.node,
table=table,
status=self.status,
total_bytes_processed=self.total_bytes_processed,
exception=self.exception,
)

def with_linting_errors(self, linting_errors: List[LintingError]) -> "DryRunResult":
Expand All @@ -49,6 +54,7 @@ def with_linting_errors(self, linting_errors: List[LintingError]) -> "DryRunResu
node=self.node,
table=self.table,
status=self.status,
total_bytes_processed=self.total_bytes_processed,
exception=self.exception,
linting_errors=linting_errors,
linting_status=linting_status,
Expand Down
2 changes: 1 addition & 1 deletion dbt_dry_run/sql_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_node_schema(self, node: Node) -> Optional[Table]:
@abstractmethod
def query(
self, sql: str
) -> Tuple[DryRunStatus, Optional[Table], Optional[Exception]]:
) -> Tuple[DryRunStatus, Optional[Table], Optional[int], Optional[Exception]]:
...

def convert_agate_type(
Expand Down
6 changes: 4 additions & 2 deletions dbt_dry_run/sql_runner/big_query_sql_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,22 @@ def get_client(self) -> Client:
)
def query(
self, sql: str
) -> Tuple[DryRunStatus, Optional[Table], Optional[Exception]]:
) -> Tuple[DryRunStatus, Optional[Table], Optional[int], Optional[Exception]]:
exception = None
table = None
total_bytes_processed = None
client = self.get_client()
try:
query_job = client.query(sql, job_config=self.JOB_CONFIG)
table = self.get_schema_from_schema_fields(query_job.schema or [])
total_bytes_processed = query_job.total_bytes_processed
status = DryRunStatus.SUCCESS
except (Forbidden, BadRequest, NotFound) as e:
status = DryRunStatus.FAILURE
if QUERY_TIMED_OUT in str(e):
raise
exception = e
return status, table, exception
return status, table, total_bytes_processed, exception

@staticmethod
def get_schema_from_schema_fields(schema_fields: List[SchemaField]) -> Table:
Expand Down
30 changes: 25 additions & 5 deletions dbt_dry_run/test/node_runner/test_incremental_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
]
)

A_TOTAL_BYTES_PROCESSED = 1000

A_NODE = SimpleNode(
unique_id="node1", depends_on=[], resource_type=ManifestScheduler.MODEL
).to_node()
Expand Down Expand Up @@ -58,7 +60,7 @@ def get_mock_sql_runner_with(
model_schema: Table, target_schema: Optional[Table]
) -> MagicMock:
mock_sql_runner = MagicMock()
mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, None)
mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, 0, None)
mock_sql_runner.get_node_schema.return_value = target_schema
return mock_sql_runner

Expand All @@ -68,7 +70,12 @@ def test_partitioned_incremental_model_declares_dbt_max_partition_variable() ->
"declare _dbt_max_partition timestamp default CURRENT_TIMESTAMP();"
)
mock_sql_runner = MagicMock()
mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
mock_sql_runner.query.return_value = (
DryRunStatus.SUCCESS,
A_SIMPLE_TABLE,
A_TOTAL_BYTES_PROCESSED,
None,
)

node = SimpleNode(
unique_id="node1",
Expand Down Expand Up @@ -463,7 +470,12 @@ def test_node_with_false_full_refresh_does_not_full_refresh_when_flag_is_true(

def test_model_with_sql_header_executes_header_first() -> None:
mock_sql_runner = MagicMock()
mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
mock_sql_runner.query.return_value = (
DryRunStatus.SUCCESS,
A_SIMPLE_TABLE,
A_TOTAL_BYTES_PROCESSED,
None,
)

pre_header_value = "DECLARE x INT64;"

Expand Down Expand Up @@ -492,7 +504,11 @@ def test_append_handler_preserves_existing_column_order() -> None:
]
)
dry_run_result = DryRunResult(
node=A_NODE, status=DryRunStatus.SUCCESS, table=model_table, exception=None
node=A_NODE,
status=DryRunStatus.SUCCESS,
table=model_table,
total_bytes_processed=0,
exception=None,
)
target_table = Table(
fields=[
Expand Down Expand Up @@ -522,7 +538,11 @@ def test_sync_handler_preserves_existing_column_order() -> None:
]
)
dry_run_result = DryRunResult(
node=A_NODE, status=DryRunStatus.SUCCESS, table=model_table, exception=None
node=A_NODE,
status=DryRunStatus.SUCCESS,
table=model_table,
total_bytes_processed=0,
exception=None,
)
target_table = Table(
fields=[
Expand Down
Loading
Loading