Skip to content

Implement SeaDatabricksClient (Complete Execution Spec) #590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 96 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from 86 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
138c2ae
[squash from exec-sea] bring over execution phase changes
varun-edachali-dbx Jun 9, 2025
3e3ab94
remove excess test
varun-edachali-dbx Jun 9, 2025
4a78165
add docstring
varun-edachali-dbx Jun 9, 2025
0dac4aa
remvoe exec func in sea backend
varun-edachali-dbx Jun 9, 2025
1b794c7
remove excess files
varun-edachali-dbx Jun 9, 2025
da5a6fe
remove excess models
varun-edachali-dbx Jun 9, 2025
686ade4
remove excess sea backend tests
varun-edachali-dbx Jun 9, 2025
31e6c83
cleanup
varun-edachali-dbx Jun 9, 2025
69ea238
re-introduce get_schema_desc
varun-edachali-dbx Jun 9, 2025
66d7517
remove SeaResultSet
varun-edachali-dbx Jun 9, 2025
71feef9
clean imports and attributes
varun-edachali-dbx Jun 9, 2025
ae9862f
pass CommandId to ExecResp
varun-edachali-dbx Jun 9, 2025
d8aa69e
remove changes in types
varun-edachali-dbx Jun 9, 2025
db139bc
add back essential types (ExecResponse, from_sea_state)
varun-edachali-dbx Jun 9, 2025
b977b12
fix fetch types
varun-edachali-dbx Jun 9, 2025
da615c0
excess imports
varun-edachali-dbx Jun 9, 2025
0da04a6
reduce diff by maintaining logs
varun-edachali-dbx Jun 9, 2025
ea9d456
fix int test types
varun-edachali-dbx Jun 9, 2025
8985c62
[squashed from exec-sea] init execution func
varun-edachali-dbx Jun 9, 2025
d9bcdbe
remove irrelevant changes
varun-edachali-dbx Jun 9, 2025
ee9fa1c
remove ResultSetFilter functionality
varun-edachali-dbx Jun 9, 2025
24c6152
remove more irrelevant changes
varun-edachali-dbx Jun 9, 2025
67fd101
remove more irrelevant changes
varun-edachali-dbx Jun 9, 2025
271fcaf
even more irrelevant changes
varun-edachali-dbx Jun 9, 2025
bf26ea3
remove sea response as init option
varun-edachali-dbx Jun 9, 2025
ed7cf91
exec test example scripts
varun-edachali-dbx Jun 9, 2025
dae15e3
formatting (black)
varun-edachali-dbx Jun 9, 2025
db5bbea
[squashed from sea-exec] merge sea stuffs
varun-edachali-dbx Jun 9, 2025
d5d3699
remove excess changes
varun-edachali-dbx Jun 9, 2025
6137a3d
remove excess removed docstring
varun-edachali-dbx Jun 9, 2025
75b0773
remove excess changes in backend
varun-edachali-dbx Jun 9, 2025
4494dcd
remove excess imports
varun-edachali-dbx Jun 9, 2025
4d0aeca
remove accidentally removed _get_schema_desc
varun-edachali-dbx Jun 9, 2025
7cece5e
remove unnecessary init with sea_response tests
varun-edachali-dbx Jun 9, 2025
8977c06
rmeove unnecessary changes
varun-edachali-dbx Jun 9, 2025
0216d7a
formatting (black)
varun-edachali-dbx Jun 9, 2025
4cb15fd
improved models and filters from cloudfetch-sea branch
varun-edachali-dbx Jun 9, 2025
dee47f7
filters stuff (align with JDBC)
varun-edachali-dbx Jun 10, 2025
e385d5b
backend from cloudfetch-sea
varun-edachali-dbx Jun 11, 2025
484064e
remove filtering, metadata ops
varun-edachali-dbx Jun 11, 2025
030edf8
raise NotImplementedErrror for metadata ops
varun-edachali-dbx Jun 11, 2025
3e22c6c
change to valid table name
varun-edachali-dbx Jun 11, 2025
787f1f7
Merge branch 'sea-migration' into sea-test-scripts
varun-edachali-dbx Jun 11, 2025
165c4f3
remove un-necessary changes
varun-edachali-dbx Jun 11, 2025
a6e40d0
simplify test module
varun-edachali-dbx Jun 11, 2025
52e3088
logging -> debug level
varun-edachali-dbx Jun 11, 2025
641c09b
change table name in log
varun-edachali-dbx Jun 11, 2025
8bd12d8
Merge branch 'sea-migration' into exec-models-sea
varun-edachali-dbx Jun 11, 2025
ffded6e
remove un-necessary changes
varun-edachali-dbx Jun 11, 2025
227f6b3
remove un-necessary backend cahnges
varun-edachali-dbx Jun 11, 2025
68657a3
remove un-needed GetChunksResponse
varun-edachali-dbx Jun 11, 2025
3940eec
remove un-needed GetChunksResponse
varun-edachali-dbx Jun 11, 2025
37813ba
reduce code duplication in response parsing
varun-edachali-dbx Jun 11, 2025
267c9f4
reduce code duplication
varun-edachali-dbx Jun 11, 2025
2967119
more clear docstrings
varun-edachali-dbx Jun 11, 2025
47fd60d
introduce strongly typed ChunkInfo
varun-edachali-dbx Jun 11, 2025
982fdf2
remove is_volume_operation from response
varun-edachali-dbx Jun 12, 2025
9e14d48
add is_volume_op and more ResultData fields
varun-edachali-dbx Jun 12, 2025
be1997e
Merge branch 'exec-models-sea' into exec-phase-sea
varun-edachali-dbx Jun 12, 2025
e8e8ee7
Merge branch 'sea-test-scripts' into exec-phase-sea
varun-edachali-dbx Jun 12, 2025
05ee4e7
add test scripts
varun-edachali-dbx Jun 12, 2025
cbace3f
Revert "Merge branch 'exec-models-sea' into exec-phase-sea"
varun-edachali-dbx Jun 12, 2025
c075b07
change logging level
varun-edachali-dbx Jun 12, 2025
c62f76d
remove un-necessary changes
varun-edachali-dbx Jun 12, 2025
199402e
remove excess changes
varun-edachali-dbx Jun 12, 2025
8ac574b
remove excess changes
varun-edachali-dbx Jun 12, 2025
398ca70
Merge branch 'sea-migration' into exec-phase-sea
varun-edachali-dbx Jun 12, 2025
b1acc5b
remove _get_schema_bytes (for now)
varun-edachali-dbx Jun 12, 2025
ef2a7ee
redundant comments
varun-edachali-dbx Jun 12, 2025
699942d
Merge branch 'sea-migration' into exec-phase-sea
varun-edachali-dbx Jun 12, 2025
af8f74e
remove fetch phase methods
varun-edachali-dbx Jun 12, 2025
5540c5c
reduce code repetititon + introduce gaps after multi line pydocs
varun-edachali-dbx Jun 12, 2025
efe3881
remove unused imports
varun-edachali-dbx Jun 12, 2025
36ab59b
move description extraction to helper func
varun-edachali-dbx Jun 12, 2025
1d57c99
formatting (black)
varun-edachali-dbx Jun 12, 2025
df6dac2
add more unit tests
varun-edachali-dbx Jun 12, 2025
ad0e527
streamline unit tests
varun-edachali-dbx Jun 12, 2025
ed446a0
test getting the list of allowed configurations
varun-edachali-dbx Jun 12, 2025
38e4b5c
reduce diff
varun-edachali-dbx Jun 12, 2025
94879c0
reduce diff
varun-edachali-dbx Jun 12, 2025
1809956
house constants in enums for readability and immutability
varun-edachali-dbx Jun 13, 2025
da5260c
add note on hybrid disposition
varun-edachali-dbx Jun 13, 2025
0385ffb
remove redundant note on arrow_schema_bytes
varun-edachali-dbx Jun 16, 2025
90bb09c
Merge branch 'sea-migration' into exec-phase-sea
varun-edachali-dbx Jun 17, 2025
cd22389
remove invalid import
varun-edachali-dbx Jun 17, 2025
82e0f8b
Merge branch 'sea-migration' into exec-phase-sea
varun-edachali-dbx Jun 17, 2025
059657e
add strong typing for manifest in _extract_description
varun-edachali-dbx Jun 17, 2025
68d6276
remove un-necessary column skipping
varun-edachali-dbx Jun 17, 2025
91d28b2
remove parsing in backend
varun-edachali-dbx Jun 17, 2025
c038f22
fix: convert sea statement id to CommandId type
varun-edachali-dbx Jun 17, 2025
398909c
make polling interval a separate constant
varun-edachali-dbx Jun 17, 2025
7ec43e1
align state checking with Thrift implementation
varun-edachali-dbx Jun 17, 2025
ec95c76
update unit tests according to changes
varun-edachali-dbx Jun 17, 2025
5c1166a
add unit tests for added methods
varun-edachali-dbx Jun 17, 2025
df9f849
add spec to description extraction docstring, add strong typing to pa…
varun-edachali-dbx Jun 18, 2025
3eb582f
add strong typing for backend parameters arg
varun-edachali-dbx Jun 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 322 additions & 26 deletions src/databricks/sql/backend/sea/backend.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,49 @@
import logging
import time
import re
from typing import Dict, Tuple, List, Optional, TYPE_CHECKING, Set
from typing import Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set

from databricks.sql.backend.sea.utils.constants import (
ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP,
ResultFormat,
ResultDisposition,
ResultCompression,
WaitTimeout,
)

if TYPE_CHECKING:
from databricks.sql.client import Cursor
from databricks.sql.result_set import ResultSet

from databricks.sql.backend.databricks_client import DatabricksClient
from databricks.sql.backend.types import SessionId, CommandId, CommandState, BackendType
from databricks.sql.backend.types import (
SessionId,
CommandId,
CommandState,
BackendType,
ExecuteResponse,
)
from databricks.sql.exc import ServerOperationError
from databricks.sql.backend.sea.utils.http_client import SeaHttpClient
from databricks.sql.backend.sea.utils.constants import (
ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP,
)
from databricks.sql.thrift_api.TCLIService import ttypes
from databricks.sql.types import SSLOptions

from databricks.sql.backend.sea.models import (
ExecuteStatementRequest,
GetStatementRequest,
CancelStatementRequest,
CloseStatementRequest,
CreateSessionRequest,
DeleteSessionRequest,
StatementParameter,
ExecuteStatementResponse,
GetStatementResponse,
CreateSessionResponse,
)
from databricks.sql.backend.sea.models.responses import (
parse_status,
parse_manifest,
parse_result,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -262,8 +286,79 @@ def get_allowed_session_configurations() -> List[str]:
"""
return list(ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP.keys())

# == Not Implemented Operations ==
# These methods will be implemented in future iterations
def _extract_description_from_manifest(self, manifest_obj) -> Optional[List]:
"""
Extract column description from a manifest object.

Args:
manifest_obj: The ResultManifest object containing schema information

Returns:
Optional[List]: A list of column tuples or None if no columns are found
"""

schema_data = manifest_obj.schema
columns_data = schema_data.get("columns", [])

if not columns_data:
return None

columns = []
for col_data in columns_data:
if not isinstance(col_data, dict):
continue

# Format: (name, type_code, display_size, internal_size, precision, scale, null_ok)
columns.append(
(
col_data.get("name", ""), # name
col_data.get("type_name", ""), # type_code
None, # display_size (not provided by SEA)
None, # internal_size (not provided by SEA)
col_data.get("precision"), # precision
col_data.get("scale"), # scale
col_data.get("nullable", True), # null_ok
)
)

return columns if columns else None

def _results_message_to_execute_response(self, sea_response, command_id):
"""
Convert a SEA response to an ExecuteResponse and extract result data.

Args:
sea_response: The response from the SEA API
command_id: The command ID

Returns:
tuple: (ExecuteResponse, ResultData, ResultManifest) - The normalized execute response,
result data object, and manifest object
"""

# Parse the response
status = parse_status(sea_response)
manifest_obj = parse_manifest(sea_response)
result_data_obj = parse_result(sea_response)

# Extract description from manifest schema
description = self._extract_description_from_manifest(manifest_obj)

# Check for compression
lz4_compressed = manifest_obj.result_compression == "LZ4_FRAME"

execute_response = ExecuteResponse(
command_id=command_id,
status=status.state,
description=description,
has_been_closed_server_side=False,
lz4_compressed=lz4_compressed,
is_staging_operation=False,
arrow_schema_bytes=None,
result_format=manifest_obj.format,
)

return execute_response, result_data_obj, manifest_obj

def execute_command(
self,
Expand All @@ -274,41 +369,242 @@ def execute_command(
lz4_compression: bool,
cursor: "Cursor",
use_cloud_fetch: bool,
parameters: List[ttypes.TSparkParameter],
parameters: List,
async_op: bool,
enforce_embedded_schema_correctness: bool,
):
"""Not implemented yet."""
raise NotImplementedError(
"execute_command is not yet implemented for SEA backend"
) -> Union["ResultSet", None]:
"""
Execute a SQL command using the SEA backend.

Args:
operation: SQL command to execute
session_id: Session identifier
max_rows: Maximum number of rows to fetch
max_bytes: Maximum number of bytes to fetch
lz4_compression: Whether to use LZ4 compression
cursor: Cursor executing the command
use_cloud_fetch: Whether to use cloud fetch
parameters: SQL parameters
async_op: Whether to execute asynchronously
enforce_embedded_schema_correctness: Whether to enforce schema correctness

Returns:
ResultSet: A SeaResultSet instance for the executed command
"""

if session_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA session ID")

sea_session_id = session_id.to_sea_session_id()

# Convert parameters to StatementParameter objects
sea_parameters = []
if parameters:
for param in parameters:
sea_parameters.append(
StatementParameter(
name=param.name,
value=param.value,
type=param.type if hasattr(param, "type") else None,
)
)

format = (
ResultFormat.ARROW_STREAM if use_cloud_fetch else ResultFormat.JSON_ARRAY
).value
disposition = (
ResultDisposition.EXTERNAL_LINKS
if use_cloud_fetch
else ResultDisposition.INLINE
).value
result_compression = (
ResultCompression.LZ4_FRAME if lz4_compression else ResultCompression.NONE
).value

request = ExecuteStatementRequest(
warehouse_id=self.warehouse_id,
session_id=sea_session_id,
statement=operation,
disposition=disposition,
format=format,
wait_timeout=(WaitTimeout.ASYNC if async_op else WaitTimeout.SYNC).value,
on_wait_timeout="CONTINUE",
row_limit=max_rows,
parameters=sea_parameters if sea_parameters else None,
result_compression=result_compression,
)

response_data = self.http_client._make_request(
method="POST", path=self.STATEMENT_PATH, data=request.to_dict()
)
response = ExecuteStatementResponse.from_dict(response_data)
statement_id = response.statement_id
if not statement_id:
raise ServerOperationError(
"Failed to execute command: No statement ID returned",
{
"operation-id": None,
"diagnostic-info": None,
},
)

command_id = CommandId.from_sea_statement_id(statement_id)

# Store the command ID in the cursor
cursor.active_command_id = command_id

# If async operation, return and let the client poll for results
if async_op:
return None

# For synchronous operation, wait for the statement to complete
status = response.status
state = status.state

# Keep polling until we reach a terminal state
while state in [CommandState.PENDING, CommandState.RUNNING]:
time.sleep(0.5) # add a small delay to avoid excessive API calls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make this delay as a constant and then later on maybe a connection prop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it entirely for now because the Thrift implementation does not seem to have a polling interval. Should I add it back? I will add it as a constant if it should be present.

state = self.get_query_state(command_id)

if state != CommandState.SUCCEEDED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to handle any other states gracefully? is it possible to reuse the state handling of thrift since i think CommandState is used in thrift client as well.

Copy link
Collaborator Author

@varun-edachali-dbx varun-edachali-dbx Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave the code another read, and looks like the Thrift implementation was explicitly checking for command closure and failure. I'll add this to the code.

I do not think we should reuse the state handling of Thrift just yet - in the current Python connector implementation it checks for some specific state that is not uniquely accounted for by CommandState. eg: here, the Python connector explicitly checks for ERROR state, while in the CommandState enum ERROR, TIMEDOUT and UNKNOWN are all normalised to FAILED. Note that we got this characterisation from the JDBC repo.

We should normalise the Exceptions raised before we abstract the state handling. Should we do this?

For now, a quick solution would be to create our own characterisation by making ERROR_STATE separate from TIMEDOUT and UNKNOWN, and this would likely allow us to create some common state handling while maintaining the exact functionality of existing Exceptions. The only issue is that this characterisation would not have any precedent since we're abandoning the JDBC one.

raise ServerOperationError(
f"Statement execution did not succeed: {status.error.message if status.error else 'Unknown error'}",
{
"operation-id": command_id.to_sea_statement_id(),
"diagnostic-info": None,
},
)

return self.get_execution_result(command_id, cursor)

def cancel_command(self, command_id: CommandId) -> None:
"""Not implemented yet."""
raise NotImplementedError(
"cancel_command is not yet implemented for SEA backend"
"""
Cancel a running command.

Args:
command_id: Command identifier to cancel

Raises:
ValueError: If the command ID is invalid
"""

if command_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA command ID")

sea_statement_id = command_id.to_sea_statement_id()

request = CancelStatementRequest(statement_id=sea_statement_id)
self.http_client._make_request(
method="POST",
path=self.CANCEL_STATEMENT_PATH_WITH_ID.format(sea_statement_id),
data=request.to_dict(),
)

def close_command(self, command_id: CommandId) -> None:
"""Not implemented yet."""
raise NotImplementedError(
"close_command is not yet implemented for SEA backend"
"""
Close a command and release resources.

Args:
command_id: Command identifier to close

Raises:
ValueError: If the command ID is invalid
"""

if command_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA command ID")

sea_statement_id = command_id.to_sea_statement_id()

request = CloseStatementRequest(statement_id=sea_statement_id)
self.http_client._make_request(
method="DELETE",
path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id),
data=request.to_dict(),
)

def get_query_state(self, command_id: CommandId) -> CommandState:
"""Not implemented yet."""
raise NotImplementedError(
"get_query_state is not yet implemented for SEA backend"
"""
Get the state of a running query.

Args:
command_id: Command identifier

Returns:
CommandState: The current state of the command

Raises:
ValueError: If the command ID is invalid
"""

if command_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA command ID")

sea_statement_id = command_id.to_sea_statement_id()

request = GetStatementRequest(statement_id=sea_statement_id)
response_data = self.http_client._make_request(
method="GET",
path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id),
data=request.to_dict(),
)

# Parse the response
response = GetStatementResponse.from_dict(response_data)
return response.status.state

def get_execution_result(
self,
command_id: CommandId,
cursor: "Cursor",
):
"""Not implemented yet."""
raise NotImplementedError(
"get_execution_result is not yet implemented for SEA backend"
) -> "ResultSet":
"""
Get the result of a command execution.

Args:
command_id: Command identifier
cursor: Cursor executing the command

Returns:
ResultSet: A SeaResultSet instance with the execution results

Raises:
ValueError: If the command ID is invalid
"""

if command_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA command ID")

sea_statement_id = command_id.to_sea_statement_id()

# Create the request model
request = GetStatementRequest(statement_id=sea_statement_id)

# Get the statement result
response_data = self.http_client._make_request(
method="GET",
path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id),
data=request.to_dict(),
)

# Create and return a SeaResultSet
from databricks.sql.result_set import SeaResultSet

# Convert the response to an ExecuteResponse and extract result data
(
execute_response,
result_data,
manifest,
) = self._results_message_to_execute_response(response_data, command_id)

return SeaResultSet(
connection=cursor.connection,
execute_response=execute_response,
sea_client=self,
buffer_size_bytes=cursor.buffer_size_bytes,
arraysize=cursor.arraysize,
result_data=result_data,
manifest=manifest,
)

# == Metadata Operations ==
Expand Down
Loading
Loading