diff --git a/src/databricks/sql/backend/databricks_client.py b/src/databricks/sql/backend/databricks_client.py index 973c2932..276954b7 100644 --- a/src/databricks/sql/backend/databricks_client.py +++ b/src/databricks/sql/backend/databricks_client.py @@ -85,6 +85,7 @@ def execute_command( parameters: List, async_op: bool, enforce_embedded_schema_correctness: bool, + row_limit: Optional[int] = None, ) -> Union["ResultSet", None]: """ Executes a SQL command or query within the specified session. @@ -103,6 +104,7 @@ def execute_command( parameters: List of parameters to bind to the query async_op: Whether to execute the command asynchronously enforce_embedded_schema_correctness: Whether to enforce schema correctness + row_limit: Maximum number of rows in the operation result. Returns: If async_op is False, returns a ResultSet object containing the diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index 0c0400ae..2a5f097c 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -408,6 +408,7 @@ def execute_command( parameters: List[Dict[str, Any]], async_op: bool, enforce_embedded_schema_correctness: bool, + row_limit: Optional[int] = None, ) -> Union[SeaResultSet, None]: """ Execute a SQL command using the SEA backend. @@ -465,7 +466,7 @@ def execute_command( format=format, wait_timeout=(WaitTimeout.ASYNC if async_op else WaitTimeout.SYNC).value, on_wait_timeout="CONTINUE", - row_limit=max_rows, + row_limit=row_limit, parameters=sea_parameters if sea_parameters else None, result_compression=result_compression, ) diff --git a/src/databricks/sql/backend/thrift_backend.py b/src/databricks/sql/backend/thrift_backend.py index e824de1c..3b38fcd5 100644 --- a/src/databricks/sql/backend/thrift_backend.py +++ b/src/databricks/sql/backend/thrift_backend.py @@ -4,7 +4,7 @@ import math import time import threading -from typing import List, Union, Any, TYPE_CHECKING +from typing import List, Optional, Union, Any, TYPE_CHECKING if TYPE_CHECKING: from databricks.sql.client import Cursor @@ -929,6 +929,7 @@ def execute_command( parameters=[], async_op=False, enforce_embedded_schema_correctness=False, + row_limit: Optional[int] = None, ) -> Union["ResultSet", None]: thrift_handle = session_id.to_thrift_handle() if not thrift_handle: @@ -969,6 +970,7 @@ def execute_command( useArrowNativeTypes=spark_arrow_types, parameters=parameters, enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness, + resultRowLimit=row_limit, ) resp = self.make_request(self._client.ExecuteStatement, req) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 0eaebfe3..61e88a35 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -335,6 +335,7 @@ def cursor( self, arraysize: int = DEFAULT_ARRAY_SIZE, buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES, + row_limit: Optional[int] = None, ) -> "Cursor": """ Return a new Cursor object using the connection. @@ -349,6 +350,7 @@ def cursor( self.session.backend, arraysize=arraysize, result_buffer_size_bytes=buffer_size_bytes, + row_limit=row_limit, ) self._cursors.append(cursor) return cursor @@ -382,6 +384,7 @@ def __init__( backend: DatabricksClient, result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES, arraysize: int = DEFAULT_ARRAY_SIZE, + row_limit: Optional[int] = None, ) -> None: """ These objects represent a database cursor, which is used to manage the context of a fetch @@ -391,16 +394,18 @@ def __init__( visible by other cursors or connections. """ - self.connection = connection - self.rowcount = -1 # Return -1 as this is not supported - self.buffer_size_bytes = result_buffer_size_bytes + self.connection: Connection = connection + + self.rowcount: int = -1 # Return -1 as this is not supported + self.buffer_size_bytes: int = result_buffer_size_bytes self.active_result_set: Union[ResultSet, None] = None - self.arraysize = arraysize + self.arraysize: int = arraysize + self.row_limit: Optional[int] = row_limit # Note that Cursor closed => active result set closed, but not vice versa - self.open = True - self.executing_command_id = None - self.backend = backend - self.active_command_id = None + self.open: bool = True + self.executing_command_id: Optional[CommandId] = None + self.backend: DatabricksClient = backend + self.active_command_id: Optional[CommandId] = None self.escaper = ParamEscaper() self.lastrowid = None @@ -779,6 +784,7 @@ def execute( parameters=prepared_params, async_op=False, enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, + row_limit=self.row_limit, ) if self.active_result_set and self.active_result_set.is_staging_operation: @@ -835,6 +841,7 @@ def execute_async( parameters=prepared_params, async_op=True, enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, + row_limit=self.row_limit, ) return self diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index a3f9b1af..8f15bccc 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -113,10 +113,12 @@ def connection(self, extra_params=()): conn.close() @contextmanager - def cursor(self, extra_params=()): + def cursor(self, extra_params=(), extra_cursor_params=()): with self.connection(extra_params) as conn: cursor = conn.cursor( - arraysize=self.arraysize, buffer_size_bytes=self.buffer_size_bytes + arraysize=self.arraysize, + buffer_size_bytes=self.buffer_size_bytes, + **dict(extra_cursor_params), ) try: yield cursor @@ -809,6 +811,60 @@ def test_catalogs_returns_arrow_table(self): results = cursor.fetchall_arrow() assert isinstance(results, pyarrow.Table) + def test_row_limit_with_larger_result(self): + """Test that row_limit properly constrains results when query would return more rows""" + row_limit = 1000 + with self.cursor(extra_cursor_params={"row_limit": row_limit}) as cursor: + # Execute a query that returns more than row_limit rows + cursor.execute("SELECT * FROM range(2000)") + rows = cursor.fetchall() + + # Check if the number of rows is limited to row_limit + assert len(rows) == row_limit, f"Expected {row_limit} rows, got {len(rows)}" + + def test_row_limit_with_smaller_result(self): + """Test that row_limit doesn't affect results when query returns fewer rows than limit""" + row_limit = 100 + expected_rows = 50 + with self.cursor(extra_cursor_params={"row_limit": row_limit}) as cursor: + # Execute a query that returns fewer than row_limit rows + cursor.execute(f"SELECT * FROM range({expected_rows})") + rows = cursor.fetchall() + + # Check if all rows are returned (not limited by row_limit) + assert ( + len(rows) == expected_rows + ), f"Expected {expected_rows} rows, got {len(rows)}" + + @skipUnless(pysql_supports_arrow(), "arrow test needs arrow support") + def test_row_limit_with_arrow_larger_result(self): + """Test that row_limit properly constrains arrow results when query would return more rows""" + row_limit = 800 + with self.cursor(extra_cursor_params={"row_limit": row_limit}) as cursor: + # Execute a query that returns more than row_limit rows + cursor.execute("SELECT * FROM range(1500)") + arrow_table = cursor.fetchall_arrow() + + # Check if the number of rows in the arrow table is limited to row_limit + assert ( + arrow_table.num_rows == row_limit + ), f"Expected {row_limit} rows, got {arrow_table.num_rows}" + + @skipUnless(pysql_supports_arrow(), "arrow test needs arrow support") + def test_row_limit_with_arrow_smaller_result(self): + """Test that row_limit doesn't affect arrow results when query returns fewer rows than limit""" + row_limit = 200 + expected_rows = 100 + with self.cursor(extra_cursor_params={"row_limit": row_limit}) as cursor: + # Execute a query that returns fewer than row_limit rows + cursor.execute(f"SELECT * FROM range({expected_rows})") + arrow_table = cursor.fetchall_arrow() + + # Check if all rows are returned (not limited by row_limit) + assert ( + arrow_table.num_rows == expected_rows + ), f"Expected {expected_rows} rows, got {arrow_table.num_rows}" + # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep # the 429/503 subsuites separate since they execute under different circumstances.