diff --git a/dbt_dry_run/models/report.py b/dbt_dry_run/models/report.py
index c991c66..d53c653 100644
--- a/dbt_dry_run/models/report.py
+++ b/dbt_dry_run/models/report.py
@@ -18,6 +18,7 @@ class ReportNode(BaseModel):
     status: DryRunStatus
     error_message: Optional[str]
     table: Optional[Table]
+    total_bytes_processed: Optional[int]
     linting_status: LintingStatus
     linting_errors: List[ReportLintingError]
 
diff --git a/dbt_dry_run/node_runner/__init__.py b/dbt_dry_run/node_runner/__init__.py
index 86d0614..4e828f6 100644
--- a/dbt_dry_run/node_runner/__init__.py
+++ b/dbt_dry_run/node_runner/__init__.py
@@ -30,13 +30,18 @@ def validate_node(self, node: Node) -> Optional[DryRunResult]:
                     node=node,
                     table=None,
                     status=DryRunStatus.FAILURE,
+                    total_bytes_processed=0,
                     exception=NotCompiledException(
                         f"Node {node.unique_id} was not compiled"
                     ),
                 )
             else:
                 return DryRunResult(
-                    node, table=None, status=DryRunStatus.SKIPPED, exception=None
+                    node,
+                    table=None,
+                    status=DryRunStatus.SKIPPED,
+                    total_bytes_processed=0,
+                    exception=None,
                 )
         else:
             return None
diff --git a/dbt_dry_run/node_runner/incremental_runner.py b/dbt_dry_run/node_runner/incremental_runner.py
index cbe4494..da6c6cb 100644
--- a/dbt_dry_run/node_runner/incremental_runner.py
+++ b/dbt_dry_run/node_runner/incremental_runner.py
@@ -63,6 +63,7 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes
     schema_changed = added_fields or removed_fields
     table: Optional[Table] = target_table
     status = dry_run_result.status
+    total_bytes_processed = dry_run_result.total_bytes_processed
     exception = dry_run_result.exception
     if schema_changed:
         table = None
@@ -74,7 +75,11 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes
         )
         exception = SchemaChangeException(msg)
     return DryRunResult(
-        node=dry_run_result.node, table=table, status=status, exception=exception
+        node=dry_run_result.node,
+        table=table,
+        status=status,
+        total_bytes_processed=total_bytes_processed,
+        exception=exception,
     )
 
 
@@ -143,11 +148,13 @@ def _verify_merge_type_compatibility(
             node, common_field_names, sql_statement
         )
         sql_statement = self._modify_sql(node, sql_statement_with_merge)
-        status, model_schema, exception = self._sql_runner.query(sql_statement)
+        status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
+            sql_statement
+        )
         if status == DryRunStatus.SUCCESS:
             return initial_result
         else:
-            return DryRunResult(node, None, status, exception)
+            return DryRunResult(node, None, status, total_bytes_processed, exception)
 
     def _modify_sql(self, node: Node, sql_statement: str) -> str:
         if node.config.sql_header:
@@ -172,11 +179,15 @@ def run(self, node: Node) -> DryRunResult:
         try:
             sql_with_literals = insert_dependant_sql_literals(node, self._results)
         except UpstreamFailedException as e:
-            return DryRunResult(node, None, DryRunStatus.FAILURE, e)
+            return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)
         run_sql = self._modify_sql(node, sql_with_literals)
-        status, model_schema, exception = self._sql_runner.query(run_sql)
+        status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
+            run_sql
+        )
 
-        result = DryRunResult(node, model_schema, status, exception)
+        result = DryRunResult(
+            node, model_schema, status, total_bytes_processed, exception
+        )
 
         full_refresh = self._get_full_refresh_config(node)
 
diff --git a/dbt_dry_run/node_runner/node_test_runner.py b/dbt_dry_run/node_runner/node_test_runner.py
index 28b68a9..95a0c06 100644
--- a/dbt_dry_run/node_runner/node_test_runner.py
+++ b/dbt_dry_run/node_runner/node_test_runner.py
@@ -10,9 +10,16 @@ def run(self, node: Node) -> DryRunResult:
         try:
             run_sql = insert_dependant_sql_literals(node, self._results)
         except UpstreamFailedException as e:
-            return DryRunResult(node, None, DryRunStatus.FAILURE, e)
+            return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)
 
-        status, predicted_table, exception = self._sql_runner.query(run_sql)
+        (
+            status,
+            predicted_table,
+            total_bytes_processed,
+            exception,
+        ) = self._sql_runner.query(run_sql)
 
-        result = DryRunResult(node, predicted_table, status, exception)
+        result = DryRunResult(
+            node, predicted_table, status, total_bytes_processed, exception
+        )
         return result
diff --git a/dbt_dry_run/node_runner/seed_runner.py b/dbt_dry_run/node_runner/seed_runner.py
index d1fd2ec..ecabe28 100644
--- a/dbt_dry_run/node_runner/seed_runner.py
+++ b/dbt_dry_run/node_runner/seed_runner.py
@@ -31,6 +31,7 @@ def run(self, node: Node) -> DryRunResult:
                     node=node,
                     table=None,
                     status=DryRunStatus.FAILURE,
+                    total_bytes_processed=0,
                     exception=exception,
                 )
             new_field = TableField(
@@ -40,7 +41,11 @@ def run(self, node: Node) -> DryRunResult:
 
         schema = Table(fields=fields)
         return DryRunResult(
-            node=node, table=schema, status=DryRunStatus.SUCCESS, exception=None
+            node=node,
+            table=schema,
+            status=DryRunStatus.SUCCESS,
+            total_bytes_processed=0,
+            exception=None,
         )
 
     def validate_node(self, node: Node) -> Optional[DryRunResult]:
diff --git a/dbt_dry_run/node_runner/snapshot_runner.py b/dbt_dry_run/node_runner/snapshot_runner.py
index 94b516f..33b2d90 100644
--- a/dbt_dry_run/node_runner/snapshot_runner.py
+++ b/dbt_dry_run/node_runner/snapshot_runner.py
@@ -56,6 +56,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult:
                 node=result.node,
                 table=result.table,
                 status=DryRunStatus.FAILURE,
+                total_bytes_processed=0,
                 exception=exception,
             )
         if node.config.strategy == "timestamp":
@@ -67,6 +68,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult:
                     node=result.node,
                     table=result.table,
                     status=DryRunStatus.FAILURE,
+                    total_bytes_processed=0,
                     exception=exception,
                 )
         elif node.config.strategy == "check":
@@ -78,6 +80,7 @@ def _validate_snapshot_config(node: Node, result: DryRunResult) -> DryRunResult:
                     node=result.node,
                     table=result.table,
                     status=DryRunStatus.FAILURE,
+                    total_bytes_processed=0,
                     exception=exception,
                 )
         else:
@@ -88,10 +91,17 @@ def run(self, node: Node) -> DryRunResult:
         try:
             run_sql = insert_dependant_sql_literals(node, self._results)
         except UpstreamFailedException as e:
-            return DryRunResult(node, None, DryRunStatus.FAILURE, e)
+            return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)
 
-        status, predicted_table, exception = self._sql_runner.query(run_sql)
-        result = DryRunResult(node, predicted_table, status, exception)
+        (
+            status,
+            predicted_table,
+            total_bytes_processed,
+            exception,
+        ) = self._sql_runner.query(run_sql)
+        result = DryRunResult(
+            node, predicted_table, status, total_bytes_processed, exception
+        )
         if result.status == DryRunStatus.SUCCESS and result.table:
             result.table.fields = [*result.table.fields, *DBT_SNAPSHOT_FIELDS]
             result = self._validate_snapshot_config(node, result)
diff --git a/dbt_dry_run/node_runner/source_runner.py b/dbt_dry_run/node_runner/source_runner.py
index 0b0fe3f..c718bd6 100644
--- a/dbt_dry_run/node_runner/source_runner.py
+++ b/dbt_dry_run/node_runner/source_runner.py
@@ -16,6 +16,7 @@ class SourceRunner(NodeRunner):
     def run(self, node: Node) -> DryRunResult:
         exception: Optional[Exception] = None
         predicted_table: Optional[Table] = None
+        total_bytes_processed: Optional[int] = 0
         status = DryRunStatus.SUCCESS
         if node.is_external_source():
             external_config = cast(ExternalConfig, node.external)
@@ -37,7 +38,9 @@ def run(self, node: Node) -> DryRunResult:
                     f"Could not find source in target environment for node '{node.unique_id}'"
                 )
 
-        return DryRunResult(node, predicted_table, status, exception)
+        return DryRunResult(
+            node, predicted_table, status, total_bytes_processed, exception
+        )
 
     def validate_node(self, node: Node) -> Optional[DryRunResult]:
         return None
diff --git a/dbt_dry_run/node_runner/table_runner.py b/dbt_dry_run/node_runner/table_runner.py
index a4a5a48..2be11ab 100644
--- a/dbt_dry_run/node_runner/table_runner.py
+++ b/dbt_dry_run/node_runner/table_runner.py
@@ -15,10 +15,14 @@ def run(self, node: Node) -> DryRunResult:
         try:
             run_sql = insert_dependant_sql_literals(node, self._results)
         except UpstreamFailedException as e:
-            return DryRunResult(node, None, DryRunStatus.FAILURE, e)
+            return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)
 
         run_sql = self._modify_sql(node, run_sql)
-        status, model_schema, exception = self._sql_runner.query(run_sql)
+        status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
+            run_sql
+        )
 
-        result = DryRunResult(node, model_schema, status, exception)
+        result = DryRunResult(
+            node, model_schema, status, total_bytes_processed, exception
+        )
         return result
diff --git a/dbt_dry_run/node_runner/view_runner.py b/dbt_dry_run/node_runner/view_runner.py
index adc2330..2916fff 100644
--- a/dbt_dry_run/node_runner/view_runner.py
+++ b/dbt_dry_run/node_runner/view_runner.py
@@ -16,10 +16,14 @@ def run(self, node: Node) -> DryRunResult:
         try:
             run_sql = insert_dependant_sql_literals(node, self._results)
         except UpstreamFailedException as e:
-            return DryRunResult(node, None, DryRunStatus.FAILURE, e)
+            return DryRunResult(node, None, DryRunStatus.FAILURE, 0, e)
 
         run_sql = self._modify_sql(node, run_sql)
-        status, model_schema, exception = self._sql_runner.query(run_sql)
+        status, model_schema, total_bytes_processed, exception = self._sql_runner.query(
+            run_sql
+        )
 
-        result = DryRunResult(node, model_schema, status, exception)
+        result = DryRunResult(
+            node, model_schema, status, total_bytes_processed, exception
+        )
         return result
diff --git a/dbt_dry_run/result_reporter.py b/dbt_dry_run/result_reporter.py
index 5ff2f2f..1fedf83 100644
--- a/dbt_dry_run/result_reporter.py
+++ b/dbt_dry_run/result_reporter.py
@@ -63,6 +63,7 @@ def get_report(self) -> Report:
                 status=result.status,
                 error_message=exception_type,
                 table=result.table,
+                total_bytes_processed=result.total_bytes_processed,
                 linting_status=result.linting_status,
                 linting_errors=_map_column_errors(result.linting_errors),
             )
diff --git a/dbt_dry_run/results.py b/dbt_dry_run/results.py
index e78f190..b31d4a0 100644
--- a/dbt_dry_run/results.py
+++ b/dbt_dry_run/results.py
@@ -31,13 +31,18 @@ class DryRunResult:
     node: Node
     table: Optional[Table]
     status: DryRunStatus
+    total_bytes_processed: Optional[int]
     exception: Optional[Exception]
     linting_status: LintingStatus = LintingStatus.SKIPPED
     linting_errors: List[LintingError] = field(default_factory=lambda: [])
 
     def replace_table(self, table: Table) -> "DryRunResult":
         return DryRunResult(
-            node=self.node, table=table, status=self.status, exception=self.exception
+            node=self.node,
+            table=table,
+            status=self.status,
+            total_bytes_processed=self.total_bytes_processed,
+            exception=self.exception,
         )
 
     def with_linting_errors(self, linting_errors: List[LintingError]) -> "DryRunResult":
@@ -49,6 +54,7 @@ def with_linting_errors(self, linting_errors: List[LintingError]) -> "DryRunResu
             node=self.node,
             table=self.table,
             status=self.status,
+            total_bytes_processed=self.total_bytes_processed,
             exception=self.exception,
             linting_errors=linting_errors,
             linting_status=linting_status,
diff --git a/dbt_dry_run/sql_runner/__init__.py b/dbt_dry_run/sql_runner/__init__.py
index cd875c5..50ea7eb 100644
--- a/dbt_dry_run/sql_runner/__init__.py
+++ b/dbt_dry_run/sql_runner/__init__.py
@@ -28,7 +28,7 @@ def get_node_schema(self, node: Node) -> Optional[Table]:
     @abstractmethod
     def query(
         self, sql: str
-    ) -> Tuple[DryRunStatus, Optional[Table], Optional[Exception]]:
+    ) -> Tuple[DryRunStatus, Optional[Table], Optional[int], Optional[Exception]]:
         ...
 
     def convert_agate_type(
diff --git a/dbt_dry_run/sql_runner/big_query_sql_runner.py b/dbt_dry_run/sql_runner/big_query_sql_runner.py
index 7f98aaf..dea5b1f 100644
--- a/dbt_dry_run/sql_runner/big_query_sql_runner.py
+++ b/dbt_dry_run/sql_runner/big_query_sql_runner.py
@@ -58,20 +58,22 @@ def get_client(self) -> Client:
     )
     def query(
         self, sql: str
-    ) -> Tuple[DryRunStatus, Optional[Table], Optional[Exception]]:
+    ) -> Tuple[DryRunStatus, Optional[Table], Optional[int], Optional[Exception]]:
         exception = None
         table = None
+        total_bytes_processed = None
         client = self.get_client()
         try:
             query_job = client.query(sql, job_config=self.JOB_CONFIG)
             table = self.get_schema_from_schema_fields(query_job.schema or [])
+            total_bytes_processed = query_job.total_bytes_processed
             status = DryRunStatus.SUCCESS
         except (Forbidden, BadRequest, NotFound) as e:
             status = DryRunStatus.FAILURE
             if QUERY_TIMED_OUT in str(e):
                 raise
             exception = e
-        return status, table, exception
+        return status, table, total_bytes_processed, exception
 
     @staticmethod
     def get_schema_from_schema_fields(schema_fields: List[SchemaField]) -> Table:
diff --git a/dbt_dry_run/test/node_runner/test_incremental_runner.py b/dbt_dry_run/test/node_runner/test_incremental_runner.py
index a4009fe..46d8abb 100644
--- a/dbt_dry_run/test/node_runner/test_incremental_runner.py
+++ b/dbt_dry_run/test/node_runner/test_incremental_runner.py
@@ -30,6 +30,8 @@
     ]
 )
 
+A_TOTAL_BYTES_PROCESSED = 1000
+
 A_NODE = SimpleNode(
     unique_id="node1", depends_on=[], resource_type=ManifestScheduler.MODEL
 ).to_node()
@@ -58,7 +60,7 @@ def get_mock_sql_runner_with(
     model_schema: Table, target_schema: Optional[Table]
 ) -> MagicMock:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, 0, None)
     mock_sql_runner.get_node_schema.return_value = target_schema
     return mock_sql_runner
 
@@ -68,7 +70,12 @@ def test_partitioned_incremental_model_declares_dbt_max_partition_variable() ->
         "declare _dbt_max_partition timestamp default CURRENT_TIMESTAMP();"
     )
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     node = SimpleNode(
         unique_id="node1",
@@ -463,7 +470,12 @@ def test_node_with_false_full_refresh_does_not_full_refresh_when_flag_is_true(
 
 def test_model_with_sql_header_executes_header_first() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     pre_header_value = "DECLARE x INT64;"
 
@@ -492,7 +504,11 @@ def test_append_handler_preserves_existing_column_order() -> None:
         ]
     )
     dry_run_result = DryRunResult(
-        node=A_NODE, status=DryRunStatus.SUCCESS, table=model_table, exception=None
+        node=A_NODE,
+        status=DryRunStatus.SUCCESS,
+        table=model_table,
+        total_bytes_processed=0,
+        exception=None,
     )
     target_table = Table(
         fields=[
@@ -522,7 +538,11 @@ def test_sync_handler_preserves_existing_column_order() -> None:
         ]
     )
     dry_run_result = DryRunResult(
-        node=A_NODE, status=DryRunStatus.SUCCESS, table=model_table, exception=None
+        node=A_NODE,
+        status=DryRunStatus.SUCCESS,
+        table=model_table,
+        total_bytes_processed=0,
+        exception=None,
     )
     target_table = Table(
         fields=[
diff --git a/dbt_dry_run/test/node_runner/test_snapshot_runner.py b/dbt_dry_run/test/node_runner/test_snapshot_runner.py
index 7fb0a73..2affa68 100644
--- a/dbt_dry_run/test/node_runner/test_snapshot_runner.py
+++ b/dbt_dry_run/test/node_runner/test_snapshot_runner.py
@@ -41,7 +41,7 @@ def test_snapshot_with_check_all_strategy_runs_sql_with_id() -> None:
             )
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
@@ -76,7 +76,7 @@ def test_snapshot_with_check_all_strategy_fails_without_id() -> None:
             )
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
@@ -114,7 +114,7 @@ def test_snapshot_with_check_all_strategy_runs_sql_with_matching_columns() -> No
             ),
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
@@ -152,7 +152,7 @@ def test_snapshot_with_check_cols_strategy_fails_with_missing_column() -> None:
             )
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
@@ -187,7 +187,7 @@ def test_snapshot_with_timestamp_strategy_with_updated_at_column() -> None:
             TableField(name="last_updated_col", type=BigQueryFieldType.TIMESTAMP),
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
@@ -222,7 +222,7 @@ def test_snapshot_with_timestamp_strategy_with_missing_updated_at_column() -> No
             TableField(name="last_updated_col", type=BigQueryFieldType.TIMESTAMP),
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
@@ -261,7 +261,7 @@ def test_snapshot_with_list_of_unique_key_columns_raises_error() -> None:
             TableField(name="last_updated_col", type=BigQueryFieldType.TIMESTAMP),
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1",
diff --git a/dbt_dry_run/test/node_runner/test_table_runner.py b/dbt_dry_run/test/node_runner/test_table_runner.py
index 6adfcb5..fef08d9 100644
--- a/dbt_dry_run/test/node_runner/test_table_runner.py
+++ b/dbt_dry_run/test/node_runner/test_table_runner.py
@@ -19,6 +19,8 @@
     ]
 )
 
+A_TOTAL_BYTES_PROCESSED = 1000
+
 
 def test_model_with_no_dependencies_runs_sql() -> None:
     mock_sql_runner = MagicMock()
@@ -30,7 +32,12 @@ def test_model_with_no_dependencies_runs_sql() -> None:
             )
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        expected_table,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     node = SimpleNode(
         unique_id="node1", depends_on=[], resource_type=ManifestScheduler.SEED
@@ -46,11 +53,17 @@ def test_model_with_no_dependencies_runs_sql() -> None:
     assert result.status == DryRunStatus.SUCCESS
     assert result.table
     assert result.table.fields[0].name == expected_table.fields[0].name
+    assert result.total_bytes_processed == A_TOTAL_BYTES_PROCESSED
 
 
 def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[])
     upstream_node = upstream_simple_node.to_node()
@@ -70,6 +83,7 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None
             node=upstream_node,
             status=DryRunStatus.FAILURE,
             table=None,
+            total_bytes_processed=0,
             exception=Exception("BOOM"),
         ),
     )
@@ -77,12 +91,18 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None
     model_runner = TableRunner(mock_sql_runner, results)
     result = model_runner.run(node)
     assert result.status == DryRunStatus.FAILURE
+    assert result.total_bytes_processed == 0
     assert isinstance(result.exception, UpstreamFailedException)
 
 
 def test_model_with_dependency_inserts_sql_literal() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[])
     upstream_node = upstream_simple_node.to_node()
@@ -105,6 +125,7 @@ def test_model_with_dependency_inserts_sql_literal() -> None:
             node=upstream_node,
             status=DryRunStatus.SUCCESS,
             table=A_SIMPLE_TABLE,
+            total_bytes_processed=A_TOTAL_BYTES_PROCESSED,
             exception=Exception("BOOM"),
         ),
     )
@@ -119,7 +140,12 @@ def test_model_with_dependency_inserts_sql_literal() -> None:
 
 def test_model_with_sql_header_executes_header_first() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     pre_header_value = "DECLARE x INT64;"
 
diff --git a/dbt_dry_run/test/node_runner/test_test_runner.py b/dbt_dry_run/test/node_runner/test_test_runner.py
index a507243..10b70ea 100644
--- a/dbt_dry_run/test/node_runner/test_test_runner.py
+++ b/dbt_dry_run/test/node_runner/test_test_runner.py
@@ -38,7 +38,7 @@ def test_test_runs_sql() -> None:
             )
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     test_node = SimpleNode(
         unique_id="test1", depends_on=[], resource_type=ManifestScheduler.TEST
diff --git a/dbt_dry_run/test/node_runner/test_view_runner.py b/dbt_dry_run/test/node_runner/test_view_runner.py
index b9d9359..8b1669c 100644
--- a/dbt_dry_run/test/node_runner/test_view_runner.py
+++ b/dbt_dry_run/test/node_runner/test_view_runner.py
@@ -21,6 +21,8 @@
     ]
 )
 
+A_TOTAL_BYTES_PROCESSED = 1000
+
 
 def sql_with_view_creation(node: Node, sql_statement: str) -> str:
     return f"CREATE OR REPLACE VIEW `{node.database}`.`{node.db_schema}`.`{node.alias}` AS (\n{sql_statement}\n)"
@@ -36,7 +38,7 @@ def test_model_with_no_dependencies_runs_sql() -> None:
             )
         ]
     )
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, None)
+    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, expected_table, 0, None)
 
     node = SimpleNode(
         unique_id="node1", depends_on=[], resource_type=ManifestScheduler.SEED
@@ -52,13 +54,19 @@ def test_model_with_no_dependencies_runs_sql() -> None:
         sql_with_view_creation(node, node.compiled_code)
     )
     assert result.status == DryRunStatus.SUCCESS
+    assert result.total_bytes_processed == 0
     assert result.table
     assert result.table.fields[0].name == expected_table.fields[0].name
 
 
 def test_model_as_view_runs_create_view() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     node = SimpleNode(
         unique_id="node1", depends_on=[], resource_type=ManifestScheduler.MODEL
@@ -78,7 +86,12 @@ def test_model_as_view_runs_create_view() -> None:
 
 def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[])
     upstream_node = upstream_simple_node.to_node()
@@ -98,6 +111,7 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None
             node=upstream_node,
             status=DryRunStatus.FAILURE,
             table=None,
+            total_bytes_processed=0,
             exception=Exception("BOOM"),
         ),
     )
@@ -105,12 +119,18 @@ def test_model_with_failed_dependency_raises_upstream_failed_exception() -> None
     model_runner = ViewRunner(mock_sql_runner, results)
     result = model_runner.run(node)
     assert result.status == DryRunStatus.FAILURE
+    assert result.total_bytes_processed == 0
     assert isinstance(result.exception, UpstreamFailedException)
 
 
 def test_model_with_dependency_inserts_sql_literal() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     upstream_simple_node = SimpleNode(unique_id="upstream", depends_on=[])
     upstream_node = upstream_simple_node.to_node()
@@ -133,6 +153,7 @@ def test_model_with_dependency_inserts_sql_literal() -> None:
             node=upstream_node,
             status=DryRunStatus.SUCCESS,
             table=A_SIMPLE_TABLE,
+            total_bytes_processed=0,
             exception=Exception("BOOM"),
         ),
     )
@@ -142,6 +163,7 @@ def test_model_with_dependency_inserts_sql_literal() -> None:
 
     executed_sql = get_executed_sql(mock_sql_runner)
     assert result.status == DryRunStatus.SUCCESS
+    assert result.total_bytes_processed == A_TOTAL_BYTES_PROCESSED
     assert executed_sql == sql_with_view_creation(
         node, "SELECT * FROM (SELECT 'foo' as `a`)"
     )
@@ -149,7 +171,12 @@ def test_model_with_dependency_inserts_sql_literal() -> None:
 
 def test_model_with_sql_header_executes_header_first() -> None:
     mock_sql_runner = MagicMock()
-    mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, A_SIMPLE_TABLE, None)
+    mock_sql_runner.query.return_value = (
+        DryRunStatus.SUCCESS,
+        A_SIMPLE_TABLE,
+        A_TOTAL_BYTES_PROCESSED,
+        None,
+    )
 
     pre_header_value = "DECLARE x INT64;"
 
diff --git a/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py b/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py
index a9237ac..4aa5914 100644
--- a/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py
+++ b/dbt_dry_run/test/sql_runner/test_big_query_sql_runner.py
@@ -56,7 +56,7 @@ def test_error_query_does_not_retry() -> None:
     sql_runner = BigQuerySQLRunner(cast(ProjectService, mock_project))
 
     expected_sql = "SELECT * FROM foo"
-    status, _, exc = sql_runner.query(expected_sql)
+    status, _, _, exc = sql_runner.query(expected_sql)
 
     assert status == DryRunStatus.FAILURE
     assert exc is raised_exception
diff --git a/dbt_dry_run/test/test_result_reporter.py b/dbt_dry_run/test/test_result_reporter.py
index 5924e7d..173145f 100644
--- a/dbt_dry_run/test/test_result_reporter.py
+++ b/dbt_dry_run/test/test_result_reporter.py
@@ -21,6 +21,7 @@ def successful_result() -> DryRunResult:
         table=Table(fields=[]),
         status=DryRunStatus.SUCCESS,
         exception=None,
+        total_bytes_processed=1000,
         linting_status=LintingStatus.SKIPPED,
     )
 
@@ -31,6 +32,7 @@ def failed_result() -> DryRunResult:
         node=SimpleNode(unique_id="B", depends_on=[]).to_node(),
         table=Table(fields=[]),
         status=DryRunStatus.FAILURE,
+        total_bytes_processed=0,
         exception=Exception("Oh no!"),
         linting_status=LintingStatus.SKIPPED,
     )
@@ -42,6 +44,7 @@ def failed_linting_result() -> DryRunResult:
         node=SimpleNode(unique_id="B", depends_on=[]).to_node(),
         table=Table(fields=[]),
         status=DryRunStatus.SUCCESS,
+        total_bytes_processed=1000,
         exception=None,
         linting_status=LintingStatus.FAILURE,
         linting_errors=[