-
Notifications
You must be signed in to change notification settings - Fork 112
SeaDatabricksClient: Add Metadata Commands #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
138c2ae
3e3ab94
4a78165
0dac4aa
1b794c7
da5a6fe
686ade4
31e6c83
69ea238
66d7517
71feef9
ae9862f
d8aa69e
db139bc
b977b12
da615c0
0da04a6
ea9d456
8985c62
d9bcdbe
ee9fa1c
24c6152
67fd101
271fcaf
bf26ea3
ed7cf91
dae15e3
db5bbea
d5d3699
6137a3d
75b0773
4494dcd
4d0aeca
7cece5e
8977c06
0216d7a
4cb15fd
dee47f7
e385d5b
484064e
030edf8
30f8266
033ae73
33821f4
3e22c6c
787f1f7
165c4f3
a6e40d0
52e3088
641c09b
8bd12d8
ffded6e
227f6b3
68657a3
3940eec
37813ba
267c9f4
2967119
47fd60d
982fdf2
9e14d48
be1997e
e8e8ee7
05ee4e7
3ffa898
2952d8d
89e2aa0
cbace3f
c075b07
c62f76d
199402e
8ac574b
398ca70
b1acc5b
ef2a7ee
699942d
af8f74e
5540c5c
efe3881
36ab59b
1d57c99
df6dac2
ad0e527
ed446a0
38e4b5c
94879c0
1809956
da5260c
0385ffb
349c021
6229848
fd52356
64e58b0
0a2cdfd
90bb09c
cd22389
82e0f8b
e64b81b
5ab9bbe
1ab6e87
f469c24
68ec65f
ffd478e
f6d873d
28675f5
3578659
8713023
22dc252
390f592
35f1ef0
a515d26
59b1330
293e356
dd40beb
14057ac
a4d5bdb
e9b1314
8ede414
09a1b11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import time | ||
import re | ||
|
@@ -10,11 +12,12 @@ | |
ResultDisposition, | ||
ResultCompression, | ||
WaitTimeout, | ||
MetadataCommands, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
from databricks.sql.client import Cursor | ||
from databricks.sql.result_set import ResultSet | ||
from databricks.sql.result_set import SeaResultSet | ||
|
||
from databricks.sql.backend.databricks_client import DatabricksClient | ||
from databricks.sql.backend.types import ( | ||
|
@@ -24,7 +27,7 @@ | |
BackendType, | ||
ExecuteResponse, | ||
) | ||
from databricks.sql.exc import DatabaseError, ServerOperationError | ||
from databricks.sql.exc import DatabaseError, ProgrammingError, ServerOperationError | ||
from databricks.sql.backend.sea.utils.http_client import SeaHttpClient | ||
from databricks.sql.types import SSLOptions | ||
|
||
|
@@ -169,7 +172,7 @@ def _extract_warehouse_id(self, http_path: str) -> str: | |
f"Note: SEA only works for warehouses." | ||
) | ||
logger.error(error_message) | ||
raise ValueError(error_message) | ||
raise ProgrammingError(error_message) | ||
|
||
@property | ||
def max_download_threads(self) -> int: | ||
|
@@ -241,14 +244,14 @@ def close_session(self, session_id: SessionId) -> None: | |
session_id: The session identifier returned by open_session() | ||
|
||
Raises: | ||
ValueError: If the session ID is invalid | ||
ProgrammingError: If the session ID is invalid | ||
OperationalError: If there's an error closing the session | ||
""" | ||
|
||
logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id) | ||
|
||
if session_id.backend_type != BackendType.SEA: | ||
raise ValueError("Not a valid SEA session ID") | ||
raise ProgrammingError("Not a valid SEA session ID") | ||
sea_session_id = session_id.to_sea_session_id() | ||
|
||
request_data = DeleteSessionRequest( | ||
|
@@ -400,12 +403,12 @@ def execute_command( | |
max_rows: int, | ||
max_bytes: int, | ||
lz4_compression: bool, | ||
cursor: "Cursor", | ||
cursor: Cursor, | ||
use_cloud_fetch: bool, | ||
parameters: List[Dict[str, Any]], | ||
async_op: bool, | ||
enforce_embedded_schema_correctness: bool, | ||
) -> Union["ResultSet", None]: | ||
) -> Union[SeaResultSet, None]: | ||
""" | ||
Execute a SQL command using the SEA backend. | ||
|
||
|
@@ -426,7 +429,7 @@ def execute_command( | |
""" | ||
|
||
if session_id.backend_type != BackendType.SEA: | ||
raise ValueError("Not a valid SEA session ID") | ||
raise ProgrammingError("Not a valid SEA session ID") | ||
|
||
sea_session_id = session_id.to_sea_session_id() | ||
|
||
|
@@ -501,11 +504,11 @@ def cancel_command(self, command_id: CommandId) -> None: | |
command_id: Command identifier to cancel | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
ProgrammingError: If the command ID is invalid | ||
""" | ||
|
||
if command_id.backend_type != BackendType.SEA: | ||
raise ValueError("Not a valid SEA command ID") | ||
raise ProgrammingError("Not a valid SEA command ID") | ||
|
||
sea_statement_id = command_id.to_sea_statement_id() | ||
|
||
|
@@ -524,11 +527,11 @@ def close_command(self, command_id: CommandId) -> None: | |
command_id: Command identifier to close | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
ProgrammingError: If the command ID is invalid | ||
""" | ||
|
||
if command_id.backend_type != BackendType.SEA: | ||
raise ValueError("Not a valid SEA command ID") | ||
raise ProgrammingError("Not a valid SEA command ID") | ||
|
||
sea_statement_id = command_id.to_sea_statement_id() | ||
|
||
|
@@ -550,7 +553,7 @@ def get_query_state(self, command_id: CommandId) -> CommandState: | |
CommandState: The current state of the command | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
ProgrammingError: If the command ID is invalid | ||
""" | ||
|
||
if command_id.backend_type != BackendType.SEA: | ||
|
@@ -572,8 +575,8 @@ def get_query_state(self, command_id: CommandId) -> CommandState: | |
def get_execution_result( | ||
self, | ||
command_id: CommandId, | ||
cursor: "Cursor", | ||
) -> "ResultSet": | ||
cursor: Cursor, | ||
) -> SeaResultSet: | ||
""" | ||
Get the result of a command execution. | ||
|
||
|
@@ -582,14 +585,14 @@ def get_execution_result( | |
cursor: Cursor executing the command | ||
|
||
Returns: | ||
ResultSet: A SeaResultSet instance with the execution results | ||
SeaResultSet: A SeaResultSet instance with the execution results | ||
|
||
Raises: | ||
ValueError: If the command ID is invalid | ||
""" | ||
|
||
if command_id.backend_type != BackendType.SEA: | ||
raise ValueError("Not a valid SEA command ID") | ||
raise ProgrammingError("Not a valid SEA command ID") | ||
|
||
sea_statement_id = command_id.to_sea_statement_id() | ||
|
||
|
@@ -626,47 +629,141 @@ def get_catalogs( | |
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: "Cursor", | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_catalogs is not yet implemented for SEA backend") | ||
cursor: Cursor, | ||
) -> SeaResultSet: | ||
"""Get available catalogs by executing 'SHOW CATALOGS'.""" | ||
result = self.execute_command( | ||
operation=MetadataCommands.SHOW_CATALOGS.value, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a thrift-specific param? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but it is a param passed to |
||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
return result | ||
|
||
def get_schemas( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: "Cursor", | ||
cursor: Cursor, | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_schemas is not yet implemented for SEA backend") | ||
) -> SeaResultSet: | ||
"""Get schemas by executing 'SHOW SCHEMAS IN catalog [LIKE pattern]'.""" | ||
if not catalog_name: | ||
raise DatabaseError("Catalog name is required for get_schemas") | ||
|
||
operation = MetadataCommands.SHOW_SCHEMAS.value.format(catalog_name) | ||
|
||
if schema_name: | ||
operation += MetadataCommands.LIKE_PATTERN.value.format(schema_name) | ||
|
||
result = self.execute_command( | ||
operation=operation, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
return result | ||
|
||
def get_tables( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: "Cursor", | ||
cursor: Cursor, | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
table_name: Optional[str] = None, | ||
table_types: Optional[List[str]] = None, | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_tables is not yet implemented for SEA backend") | ||
) -> SeaResultSet: | ||
"""Get tables by executing 'SHOW TABLES IN catalog [SCHEMA LIKE pattern] [LIKE pattern]'.""" | ||
operation = ( | ||
MetadataCommands.SHOW_TABLES_ALL_CATALOGS.value | ||
if catalog_name in [None, "*", "%"] | ||
else MetadataCommands.SHOW_TABLES.value.format( | ||
MetadataCommands.CATALOG_SPECIFIC.value.format(catalog_name) | ||
) | ||
) | ||
|
||
if schema_name: | ||
operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name) | ||
|
||
if table_name: | ||
operation += MetadataCommands.LIKE_PATTERN.value.format(table_name) | ||
|
||
result = self.execute_command( | ||
operation=operation, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
|
||
# Apply client-side filtering by table_types | ||
from databricks.sql.backend.sea.utils.filters import ResultSetFilter | ||
|
||
result = ResultSetFilter.filter_tables_by_type(result, table_types) | ||
|
||
return result | ||
|
||
def get_columns( | ||
self, | ||
session_id: SessionId, | ||
max_rows: int, | ||
max_bytes: int, | ||
cursor: "Cursor", | ||
cursor: Cursor, | ||
catalog_name: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
table_name: Optional[str] = None, | ||
column_name: Optional[str] = None, | ||
): | ||
"""Not implemented yet.""" | ||
raise NotImplementedError("get_columns is not yet implemented for SEA backend") | ||
) -> SeaResultSet: | ||
"""Get columns by executing 'SHOW COLUMNS IN CATALOG catalog [SCHEMA LIKE pattern] [TABLE LIKE pattern] [LIKE pattern]'.""" | ||
if not catalog_name: | ||
raise DatabaseError("Catalog name is required for get_columns") | ||
|
||
operation = MetadataCommands.SHOW_COLUMNS.value.format(catalog_name) | ||
|
||
if schema_name: | ||
operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name) | ||
|
||
if table_name: | ||
operation += MetadataCommands.TABLE_LIKE_PATTERN.value.format(table_name) | ||
|
||
if column_name: | ||
operation += MetadataCommands.LIKE_PATTERN.value.format(column_name) | ||
|
||
result = self.execute_command( | ||
operation=operation, | ||
session_id=session_id, | ||
max_rows=max_rows, | ||
max_bytes=max_bytes, | ||
lz4_compression=False, | ||
cursor=cursor, | ||
use_cloud_fetch=False, | ||
parameters=[], | ||
async_op=False, | ||
enforce_embedded_schema_correctness=False, | ||
) | ||
assert result is not None, "execute_command returned None in synchronous mode" | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not using compression for metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a side effect of setting
use_cloud_fetch=False
: compression is not supported forINLINE
+JSON
in SEA.