diff --git a/dbt_dry_run/models/report.py b/dbt_dry_run/models/report.py index c991c66..d53c653 100644 --- a/dbt_dry_run/models/report.py +++ b/dbt_dry_run/models/report.py @@ -18,6 +18,7 @@ class ReportNode(BaseModel): status: DryRunStatus error_message: Optional[str] table: Optional[Table] + total_bytes_processed: Optional[int] linting_status: LintingStatus linting_errors: List[ReportLintingError] diff --git a/dbt_dry_run/node_runner/__init__.py b/dbt_dry_run/node_runner/__init__.py index 86d0614..4e828f6 100644 --- a/dbt_dry_run/node_runner/__init__.py +++ b/dbt_dry_run/node_runner/__init__.py @@ -30,13 +30,18 @@ def validate_node(self, node: Node) -> Optional[DryRunResult]: node=node, table=None, status=DryRunStatus.FAILURE, + total_bytes_processed=0, exception=NotCompiledException( f"Node {node.unique_id} was not compiled" ), ) else: return DryRunResult( - node, table=None, status=DryRunStatus.SKIPPED, exception=None + node, + table=None, + status=DryRunStatus.SKIPPED, + total_bytes_processed=0, + exception=None, ) else: return None diff --git a/dbt_dry_run/node_runner/incremental_runner.py b/dbt_dry_run/node_runner/incremental_runner.py index cbe4494..da6c6cb 100644 --- a/dbt_dry_run/node_runner/incremental_runner.py +++ b/dbt_dry_run/node_runner/incremental_runner.py @@ -63,6 +63,7 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes schema_changed = added_fields or removed_fields table: Optional[Table] = target_table status = dry_run_result.status + total_bytes_processed = dry_run_result.total_bytes_processed exception = dry_run_result.exception if schema_changed: table = None @@ -74,7 +75,11 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes ) exception = SchemaChangeException(msg) return DryRunResult( - node=dry_run_result.node, table=table, status=status, exception=exception + node=dry_run_result.node, + table=table, + status=status, + total_bytes_processed=total_bytes_processed, + exception=exception, ) @@ -143,11 +148,13 @@ def _verify_merge_type_compatibility( node, common_field_names, sql_statement ) sql_statement = self._modify_sql(node, sql_statement_with_merge) - status, model_schema, exception = self._sql_runner.query(sql_statement) + status, model_schema, total_bytes_processed, exception = self._sql_runner.query( + sql_statement + ) if status == DryRunStatus.SUCCESS: return initial_result else: - return DryRunResult(node, None, status, exception) + return DryRunResult(node, None, status, total_bytes_processed, exception) def _modify_sql(self, node: Node, sql_statement: str) -> str: if node.config.sql_header: @@ -172,11 +179,15 @@ def run(self, node: Node) -> DryRunResult: try: sql_with_literals = insert_dependant_sql_literals(node, self._results) except UpstreamFailedException as e: - return DryRunResult(node, None, DryRunStatus.FAILURE, e) + return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e) run_sql = self._modify_sql(node, sql_with_literals) - status, model_schema, exception = self._sql_runner.query(run_sql) + status, model_schema, total_bytes_processed, exception = self._sql_runner.query( + run_sql + ) - result = DryRunResult(node, model_schema, status, exception) + result = DryRunResult( + node, model_schema, status, total_bytes_processed, exception + ) full_refresh = self._get_full_refresh_config(node) diff --git a/dbt_dry_run/node_runner/node_test_runner.py b/dbt_dry_run/node_runner/node_test_runner.py index 28b68a9..95a0c06 100644 --- a/dbt_dry_run/node_runner/node_test_runner.py +++ b/dbt_dry_run/node_runner/node_test_runner.py @@ -10,9 +10,16 @@ def run(self, node: Node) -> DryRunResult: try: run_sql = insert_dependant_sql_literals(node, self._results) except UpstreamFailedException as e: - return DryRunResult(node, None, DryRunStatus.FAILURE, e) + return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e) - status, predicted_table, exception = self._sql_runner.query(run_sql) + ( + status, + predicted_table, + total_bytes_processed, + exception, + ) = self._sql_runner.query(run_sql) - result = DryRunResult(node, predicted_table, status, exception) + result = DryRunResult( + node, predicted_table, status, total_bytes_processed, exception + ) return result diff --git a/dbt_dry_run/node_runner/seed_runner.py b/dbt_dry_run/node_runner/seed_runner.py index d1fd2ec..ecabe28 100644 --- a/dbt_dry_run/node_runner/seed_runner.py +++ b/dbt_dry_run/node_runner/seed_runner.py @@ -31,6 +31,7 @@ def run(self, node: Node) -> DryRunResult: node=node, table=None, status=DryRunStatus.FAILURE, + total_bytes_processed=0, exception=exception, ) new_field = TableField( @@ -40,7 +41,11 @@ def run(self, node: Node) -> DryRunResult: schema = Table(fields=fields) return DryRunResult( - node=node, table=schema, status=DryRunStatus.SUCCESS, exception=None + node=node, + table=schema, + status=DryRunStatus.SUCCESS, + total_bytes_processed=0, + exception=None, ) def validate_node(self, node: Node) -> Optional[DryRunResult]: diff --git a/dbt_dry_run/node_runner/snapshot_runner.py b/dbt_dry_run/node_runner/snapshot_runner.py index 94b516f..33b2d90 100644 --- a/dbt_dry_run/node_runner/snapshot_runner.py +++ b/dbt_dry_run/node_runner/snapshot_runner.py @@ -56,6 +56,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult: node=result.node, table=result.table, status=DryRunStatus.FAILURE, + total_bytes_processed=0, exception=exception, ) if node.config.strategy == "timestamp": @@ -67,6 +68,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult: node=result.node, table=result.table, status=DryRunStatus.FAILURE, + total_bytes_processed=0, exception=exception, ) elif node.config.strategy == "check": @@ -78,6 +80,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult: node=result.node, table=result.table, status=DryRunStatus.FAILURE, + total_bytes_processed=0, exception=exception, ) else: @@ -88,10 +91,17 @@ def run(self, node: Node) -> DryRunResult: try: run_sql = insert_dependant_sql_literals(node, self._results) except UpstreamFailedException as e: - return DryRunResult(node, None, DryRunStatus.FAILURE, e) + return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e) - status, predicted_table, exception = self._sql_runner.query(run_sql) - result = DryRunResult(node, predicted_table, status, exception) + ( + status, + predicted_table, + total_bytes_processed, + exception, + ) = self._sql_runner.query(run_sql) + result = DryRunResult( + node, predicted_table, status, total_bytes_processed, exception + ) if result.status == DryRunStatus.SUCCESS and result.table: result.table.fields = [*result.table.fields, *DBT_SNAPSHOT_FIELDS] result = self._validate_snapshot_config(node, result) diff --git a/dbt_dry_run/node_runner/source_runner.py b/dbt_dry_run/node_runner/source_runner.py index 0b0fe3f..c718bd6 100644 --- a/dbt_dry_run/node_runner/source_runner.py +++ b/dbt_dry_run/node_runner/source_runner.py @@ -16,6 +16,7 @@ class SourceRunner(NodeRunner): def run(self, node: Node) -> DryRunResult: exception: Optional[Exception] = None predicted_table: Optional[Table] = None + total_bytes_processed: Optional[int] = 0 status = DryRunStatus.SUCCESS if node.is_external_source(): external_config = cast(ExternalConfig, node.external) @@ -37,7 +38,9 @@ def run(self, node: Node) -> DryRunResult: f"Could not find source in target environment for node '{node.unique_id}'" ) - return DryRunResult(node, predicted_table, status, exception) + return DryRunResult( + node, predicted_table, status, total_bytes_processed, exception + ) def validate_node(self, node: Node) -> Optional[DryRunResult]: return None diff --git a/dbt_dry_run/node_runner/table_runner.py b/dbt_dry_run/node_runner/table_runner.py index a4a5a48..2be11ab 100644 --- a/dbt_dry_run/node_runner/table_runner.py +++ b/dbt_dry_run/node_runner/table_runner.py @@ -15,10 +15,14 @@ def run(self, node: Node) -> DryRunResult: try: run_sql = insert_dependant_sql_literals(node, self._results) except UpstreamFailedException as e: - return DryRunResult(node, None, DryRunStatus.FAILURE, e) + return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e) run_sql = self._modify_sql(node, run_sql) - status, model_schema, exception = self._sql_runner.query(run_sql) + status, model_schema, total_bytes_processed, exception = self._sql_runner.query( + run_sql + ) - result = DryRunResult(node, model_schema, status, exception) + result = DryRunResult( + node, model_schema, status, total_bytes_processed, exception + ) return result diff --git a/dbt_dry_run/node_runner/view_runner.py b/dbt_dry_run/node_runner/view_runner.py index adc2330..2916fff 100644 --- a/dbt_dry_run/node_runner/view_runner.py +++ b/dbt_dry_run/node_runner/view_runner.py @@ -16,10 +16,14 @@ def run(self, node: Node) -> DryRunResult: try: run_sql = insert_dependant_sql_literals(node, self._results) except UpstreamFailedException as e: - return DryRunResult(node, None, DryRunStatus.FAILURE, e) + return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e) run_sql = self._modify_sql(node, run_sql) - status, model_schema, exception = self._sql_runner.query(run_sql) + status, model_schema, total_bytes_processed, exception = self._sql_runner.query( + run_sql + ) - result = DryRunResult(node, model_schema, status, exception) + result = DryRunResult( + node, model_schema, status, total_bytes_processed, exception + ) return result diff --git a/dbt_dry_run/result_reporter.py b/dbt_dry_run/result_reporter.py index 5ff2f2f..1fedf83 100644 --- a/dbt_dry_run/result_reporter.py +++ b/dbt_dry_run/result_reporter.py @@ -63,6 +63,7 @@ def get_report(self) -> Report: status=result.status, error_message=exception_type, table=result.table, + total_bytes_processed=result.total_bytes_processed, linting_status=result.linting_status, linting_errors=_map_column_errors(result.linting_errors), ) diff --git a/dbt_dry_run/results.py b/dbt_dry_run/results.py index e78f190..b31d4a0 100644 --- a/dbt_dry_run/results.py +++ b/dbt_dry_run/results.py @@ -31,13 +31,18 @@ class DryRunResult: node: Node table: Optional[Table] status: DryRunStatus + total_bytes_processed: Optional[int] exception: Optional[Exception] linting_status: LintingStatus = LintingStatus.SKIPPED linting_errors: List[LintingError] = field(default_factory=lambda: []) def replace_table(self, table: Table) -> "DryRunResult": return DryRunResult( - node=self.node, table=table, status=self.status, exception=self.exception + node=self.node, + table=table, + status=self.status, + total_bytes_processed=self.total_bytes_processed, + exception=self.exception, ) def with_linting_errors(self, linting_errors: List[LintingError]) -> "DryRunResult": @@ -49,6 +54,7 @@ def with_linting_errors(self, linting_errors: List[LintingError]) -> "DryRunResu node=self.node, table=self.table, status=self.status, + total_bytes_processed=self.total_bytes_processed, exception=self.exception, linting_errors=linting_errors, linting_status=linting_status, diff --git a/dbt_dry_run/sql_runner/__init__.py b/dbt_dry_run/sql_runner/__init__.py index cd875c5..50ea7eb 100644 --- a/dbt_dry_run/sql_runner/__init__.py +++ b/dbt_dry_run/sql_runner/__init__.py @@ -28,7 +28,7 @@ def get_node_schema(self, node: Node) -> Optional[Table]: @abstractmethod def query( self, sql: str - ) -> Tuple[DryRunStatus, Optional[Table], Optional[Exception]]: + ) -> Tuple[DryRunStatus, Optional[Table], Optional[int], Optional[Exception]]: ... def convert_agate_type( diff --git a/dbt_dry_run/sql_runner/big_query_sql_runner.py b/dbt_dry_run/sql_runner/big_query_sql_runner.py index 7f98aaf..dea5b1f 100644 --- a/dbt_dry_run/sql_runner/big_query_sql_runner.py +++ b/dbt_dry_run/sql_runner/big_query_sql_runner.py @@ -58,20 +58,22 @@ def get_client(self) -> Client: ) def query( self, sql: str - ) -> Tuple[DryRunStatus, Optional[Table], Optional[Exception]]: + ) -> Tuple[DryRunStatus, Optional[Table], Optional[int], Optional[Exception]]: exception = None table = None + total_bytes_processed = None client = self.get_client() try: query_job = client.query(sql, job_config=self.JOB_CONFIG) table = self.get_schema_from_schema_fields(query_job.schema or []) + total_bytes_processed = query_job.total_bytes_processed status = DryRunStatus.SUCCESS except (Forbidden, BadRequest, NotFound) as e: status = DryRunStatus.FAILURE if QUERY_TIMED_OUT in str(e): raise exception = e - return status, table, exception + return status, table, total_bytes_processed, exception @staticmethod def get_schema_from_schema_fields(schema_fields: List[SchemaField]) -> Table: diff --git a/dbt_dry_run/test/node_runner/test_incremental_runner.py b/dbt_dry_run/test/node_runner/test_incremental_runner.py index a4009fe..46d8abb 100644 --- a/dbt_dry_run/test/node_runner/test_incremental_runner.py +++ b/dbt_dry_run/test/node_runner/test_incremental_runner.py @@ -30,6 +30,8 @@ ] ) +A_TOTAL_BYTES_PROCESSED = 1000 + A_NODE = SimpleNode( unique_id="node1", depends_on=[], resource_type=ManifestScheduler.MODEL ).to_node() @@ -58,7 +60,7 @@ def get_mock_sql_runner_with( model_schema: Table, target_schema: Optional[Table] ) -> MagicMock: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, 0, None) mock_sql_runner.get_node_schema.return_value = target_schema return mock_sql_runner @@ -68,7 +70,12 @@ def test_partitioned_incremental_model_declares_dbt_max_partition_variable() -> "declare _dbt_max_partition timestamp default CURRENT_TIMESTAMP();" ) mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) node = SimpleNode( unique_id="node1", @@ -463,7 +470,12 @@ def test_node_with_false_full_refresh_does_not_full_refresh_when_flag_is_true( def test_model_with_sql_header_executes_header_first() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) pre_header_value = "DECLARE x INT64;" @@ -492,7 +504,11 @@ def test_append_handler_preserves_existing_column_order() -> None: ] ) dry_run_result = DryRunResult( - node=A_NODE, status=DryRunStatus.SUCCESS, table=model_table, exception=None + node=A_NODE, + status=DryRunStatus.SUCCESS, + table=model_table, + total_bytes_processed=0, + exception=None, ) target_table = Table( fields=[ @@ -522,7 +538,11 @@ def test_sync_handler_preserves_existing_column_order() -> None: ] ) dry_run_result = DryRunResult( - node=A_NODE, status=DryRunStatus.SUCCESS, table=model_table, exception=None + node=A_NODE, + status=DryRunStatus.SUCCESS, + table=model_table, + total_bytes_processed=0, + exception=None, ) target_table = Table( fields=[ diff --git a/dbt_dry_run/test/node_runner/test_snapshot_runner.py b/dbt_dry_run/test/node_runner/test_snapshot_runner.py index 7fb0a73..2affa68 100644 --- a/dbt_dry_run/test/node_runner/test_snapshot_runner.py +++ b/dbt_dry_run/test/node_runner/test_snapshot_runner.py @@ -41,7 +41,7 @@ def test_snapshot_with_check_all_strategy_runs_sql_with_id() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", @@ -76,7 +76,7 @@ def test_snapshot_with_check_all_strategy_fails_without_id() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", @@ -114,7 +114,7 @@ def test_snapshot_with_check_all_strategy_runs_sql_with_matching_columns() -> No ), ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", @@ -152,7 +152,7 @@ def test_snapshot_with_check_cols_strategy_fails_with_missing_column() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", @@ -187,7 +187,7 @@ def test_snapshot_with_timestamp_strategy_with_updated_at_column() -> None: TableField(name="last_updated_col", type=BigQueryFieldType.TIMESTAMP), ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", @@ -222,7 +222,7 @@ def test_snapshot_with_timestamp_strategy_with_missing_updated_at_column() -> No TableField(name="last_updated_col", type=BigQueryFieldType.TIMESTAMP), ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", @@ -261,7 +261,7 @@ def test_snapshot_with_list_of_unique_key_columns_raises_error() -> None: TableField(name="last_updated_col", type=BigQueryFieldType.TIMESTAMP), ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", diff --git a/dbt_dry_run/test/node_runner/test_table_runner.py b/dbt_dry_run/test/node_runner/test_table_runner.py index 6adfcb5..fef08d9 100644 --- a/dbt_dry_run/test/node_runner/test_table_runner.py +++ b/dbt_dry_run/test/node_runner/test_table_runner.py @@ -19,6 +19,8 @@ ] ) +A_TOTAL_BYTES_PROCESSED = 1000 + def test_model_with_no_dependencies_runs_sql() -> None: mock_sql_runner = MagicMock() @@ -30,7 +32,12 @@ def test_model_with_no_dependencies_runs_sql() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + expected_table, + A_TOTAL_BYTES_PROCESSED, + None, + ) node = SimpleNode( unique_id="node1", depends_on=[], resource_type=ManifestScheduler.SEED @@ -46,11 +53,17 @@ def test_model_with_no_dependencies_runs_sql() -> None: assert result.status == DryRunStatus.SUCCESS assert result.table assert result.table.fields[0].name == expected_table.fields[0].name + assert result.total_bytes_processed == A_TOTAL_BYTES_PROCESSED def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[]) upstream_node = upstream_simple_node.to_node() @@ -70,6 +83,7 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None node=upstream_node, status=DryRunStatus.FAILURE, table=None, + total_bytes_processed=0, exception=Exception("BOOM"), ), ) @@ -77,12 +91,18 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None model_runner = TableRunner(mock_sql_runner, results) result = model_runner.run(node) assert result.status == DryRunStatus.FAILURE + assert result.total_bytes_processed == 0 assert isinstance(result.exception, UpstreamFailedException) def test_model_with_dependency_inserts_sql_literal() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[]) upstream_node = upstream_simple_node.to_node() @@ -105,6 +125,7 @@ def test_model_with_dependency_inserts_sql_literal() -> None: node=upstream_node, status=DryRunStatus.SUCCESS, table=A_SIMPLE_TABLE, + total_bytes_processed=A_TOTAL_BYTES_PROCESSED, exception=Exception("BOOM"), ), ) @@ -119,7 +140,12 @@ def test_model_with_dependency_inserts_sql_literal() -> None: def test_model_with_sql_header_executes_header_first() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) pre_header_value = "DECLARE x INT64;" diff --git a/dbt_dry_run/test/node_runner/test_test_runner.py b/dbt_dry_run/test/node_runner/test_test_runner.py index a507243..10b70ea 100644 --- a/dbt_dry_run/test/node_runner/test_test_runner.py +++ b/dbt_dry_run/test/node_runner/test_test_runner.py @@ -38,7 +38,7 @@ def test_test_runs_sql() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) test_node = SimpleNode( unique_id="test1", depends_on=[], resource_type=ManifestScheduler.TEST diff --git a/dbt_dry_run/test/node_runner/test_view_runner.py b/dbt_dry_run/test/node_runner/test_view_runner.py index b9d9359..8b1669c 100644 --- a/dbt_dry_run/test/node_runner/test_view_runner.py +++ b/dbt_dry_run/test/node_runner/test_view_runner.py @@ -21,6 +21,8 @@ ] ) +A_TOTAL_BYTES_PROCESSED = 1000 + def sql_with_view_creation(node: Node, sql_statement: str) -> str: return f"CREATE OR REPLACE VIEW `{node.database}`.`{node.db_schema}`.`{node.alias}` AS (\n{sql_statement}\n)" @@ -36,7 +38,7 @@ def test_model_with_no_dependencies_runs_sql() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None) + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None) node = SimpleNode( unique_id="node1", depends_on=[], resource_type=ManifestScheduler.SEED @@ -52,13 +54,19 @@ def test_model_with_no_dependencies_runs_sql() -> None: sql_with_view_creation(node, node.compiled_code) ) assert result.status == DryRunStatus.SUCCESS + assert result.total_bytes_processed == 0 assert result.table assert result.table.fields[0].name == expected_table.fields[0].name def test_model_as_view_runs_create_view() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) node = SimpleNode( unique_id="node1", depends_on=[], resource_type=ManifestScheduler.MODEL @@ -78,7 +86,12 @@ def test_model_as_view_runs_create_view() -> None: def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[]) upstream_node = upstream_simple_node.to_node() @@ -98,6 +111,7 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None node=upstream_node, status=DryRunStatus.FAILURE, table=None, + total_bytes_processed=0, exception=Exception("BOOM"), ), ) @@ -105,12 +119,18 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None model_runner = ViewRunner(mock_sql_runner, results) result = model_runner.run(node) assert result.status == DryRunStatus.FAILURE + assert result.total_bytes_processed == 0 assert isinstance(result.exception, UpstreamFailedException) def test_model_with_dependency_inserts_sql_literal() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[]) upstream_node = upstream_simple_node.to_node() @@ -133,6 +153,7 @@ def test_model_with_dependency_inserts_sql_literal() -> None: node=upstream_node, status=DryRunStatus.SUCCESS, table=A_SIMPLE_TABLE, + total_bytes_processed=0, exception=Exception("BOOM"), ), ) @@ -142,6 +163,7 @@ def test_model_with_dependency_inserts_sql_literal() -> None: executed_sql = get_executed_sql(mock_sql_runner) assert result.status == DryRunStatus.SUCCESS + assert result.total_bytes_processed == A_TOTAL_BYTES_PROCESSED assert executed_sql == sql_with_view_creation( node, "SELECT * FROM (SELECT 'foo' as `a`)" ) @@ -149,7 +171,12 @@ def test_model_with_dependency_inserts_sql_literal() -> None: def test_model_with_sql_header_executes_header_first() -> None: mock_sql_runner = MagicMock() - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None) + mock_sql_runner.query.return_value = ( + DryRunStatus.SUCCESS, + A_SIMPLE_TABLE, + A_TOTAL_BYTES_PROCESSED, + None, + ) pre_header_value = "DECLARE x INT64;" diff --git a/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py b/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py index a9237ac..4aa5914 100644 --- a/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py +++ b/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py @@ -56,7 +56,7 @@ def test_error_query_does_not_retry() -> None: sql_runner = BigQuerySQLRunner(cast(ProjectService, mock_project)) expected_sql = "SELECT * FROM foo" - status, _, exc = sql_runner.query(expected_sql) + status, _, _, exc = sql_runner.query(expected_sql) assert status == DryRunStatus.FAILURE assert exc is raised_exception diff --git a/dbt_dry_run/test/test_result_reporter.py b/dbt_dry_run/test/test_result_reporter.py index 5924e7d..173145f 100644 --- a/dbt_dry_run/test/test_result_reporter.py +++ b/dbt_dry_run/test/test_result_reporter.py @@ -21,6 +21,7 @@ def successful_result() -> DryRunResult: table=Table(fields=[]), status=DryRunStatus.SUCCESS, exception=None, + total_bytes_processed=1000, linting_status=LintingStatus.SKIPPED, ) @@ -31,6 +32,7 @@ def failed_result() -> DryRunResult: node=SimpleNode(unique_id="B", depends_on=[]).to_node(), table=Table(fields=[]), status=DryRunStatus.FAILURE, + total_bytes_processed=0, exception=Exception("Oh no!"), linting_status=LintingStatus.SKIPPED, ) @@ -42,6 +44,7 @@ def failed_linting_result() -> DryRunResult: node=SimpleNode(unique_id="B", depends_on=[]).to_node(), table=Table(fields=[]), status=DryRunStatus.SUCCESS, + total_bytes_processed=1000, exception=None, linting_status=LintingStatus.FAILURE, linting_errors=[