diff --git a/.github/workflows/nightly-dev-tests.yml b/.github/workflows/nightly-dev-tests.yml index c397ebe..9504368 100644 --- a/.github/workflows/nightly-dev-tests.yml +++ b/.github/workflows/nightly-dev-tests.yml @@ -14,6 +14,7 @@ jobs: - "3.9" - "3.10" - "3.11" + - "3.12" fail-fast: false steps: - uses: actions/checkout@v3 diff --git a/prefect_duckdb/database.py b/prefect_duckdb/database.py index c7987dc..f04172e 100644 --- a/prefect_duckdb/database.py +++ b/prefect_duckdb/database.py @@ -6,7 +6,6 @@ import duckdb import pandas -import pyarrow as pa from duckdb import DuckDBPyConnection, DuckDBPyRelation from prefect import get_client, task from prefect.artifacts import create_markdown_artifact @@ -548,22 +547,6 @@ async def from_df( await run_sync_in_worker_thread(cursor.register, table_name, table) return cursor - @sync_compatible - async def from_arrow( - self, arrow_object: pa.Table, table_name: Optional[str] = None - ) -> DuckDBPyRelation: - """ - Create a table from an Arrow object. - - Args: - arrow_object: The Arrow object. - """ - cursor = self._connection.cursor() - table = arrow_object - if table_name: - await run_sync_in_worker_thread(cursor.register, table_name, table) - return cursor - @sync_compatible async def from_parquet( self, diff --git a/requirements.txt b/requirements.txt index 3d80f9e..65ea713 100644 --- a/requirements.txt +++ b/requirements.txt @@ -194,7 +194,7 @@ mypy-extensions==1.0.0 # mypy nodeenv==1.9.1 # via pre-commit -numpy==2.2.1 +numpy>=2.0.2 # via pandas oauthlib==3.2.2 # via requests-oauthlib diff --git a/tests/test_database.py b/tests/test_database.py index 1074a39..428c880 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,8 +1,12 @@ +import re + import pytest from duckdb import DuckDBPyConnection from prefect import flow -from prefect.server.schemas.actions import ArtifactCreate +from prefect.server import schemas +from prefect.server.schemas.actions import ArtifactCreate +import pydantic from prefect_duckdb.database import DuckDBConnector, duckdb_query qplan = """ @@ -82,6 +86,82 @@ def test_execute(self, duck_connector: DuckDBConnector): cursor = duck_connector.execute("CREATE TABLE test_table (i INTEGER, j STRING)") assert type(cursor) is DuckDBPyConnection + async def test_execute_debug( + self, duck_connector: DuckDBConnector, caplog, client, artifact + ): + with duck_connector.get_connection(): + await duck_connector.execute( + "CREATE TABLE students (name VARCHAR, sid INTEGER);" + ) + await duck_connector.execute( + "CREATE TABLE exams (eid INTEGER, subject VARCHAR, sid INTEGER);" + ) + await duck_connector.execute( + "INSERT INTO students VALUES ('Mark', 1), ('Joe', 2), ('Matthew', 3);" + ) + await duck_connector.execute( + "INSERT INTO exams VALUES \n" + "(10, 'Physics', 1), (20, 'Chemistry', 2), (30, 'Literature', 3);" + ) + + operation = ( + "SELECT name FROM students JOIN exams USING (sid) WHERE name LIKE 'Ma%'" + ) + await duck_connector.execute( + operation, + debug=True, + ) + + artifact_key = ( + re.sub( + "[^A-Za-z0-9 ]+", + "", + operation, + ) + .lower() + .replace(" ", "-") + ) + response = await client.get(f"/artifacts/{artifact_key}/latest") + result = pydantic.parse_obj_as(schemas.core.Artifact, response.json()) + assert "Physical_Plan" in result.data + + async def test_set_debug( + self, duck_connector: DuckDBConnector, caplog, client, artifact + ): + with duck_connector.get_connection(): + await duck_connector.execute( + "CREATE TABLE students (name VARCHAR, sid INTEGER);" + ) + await duck_connector.execute( + "CREATE TABLE exams (eid INTEGER, subject VARCHAR, sid INTEGER);" + ) + await duck_connector.execute( + "INSERT INTO students VALUES ('Mark', 1), ('Joe', 2), ('Matthew', 3);" + ) + await duck_connector.execute( + "INSERT INTO exams VALUES \n" + "(10, 'Physics', 1), (20, 'Chemistry', 2), (30, 'Literature', 3);" + ) + + operation = ( + "SELECT name FROM students JOIN exams USING (sid) WHERE name LIKE 'Ma%'" + ) + duck_connector.set_debug(True) + await duck_connector.execute(operation) + + artifact_key = ( + re.sub( + "[^A-Za-z0-9 ]+", + "", + operation, + ) + .lower() + .replace(" ", "-") + ) + response = await client.get(f"/artifacts/{artifact_key}/latest") + result = pydantic.parse_obj_as(schemas.core.Artifact, response.json()) + assert "Physical_Plan" in result.data + def test_fetch_one(self, duck_connector: DuckDBConnector): duck_connector.get_connection() cursor = duck_connector.execute("CREATE TABLE test_table (i INTEGER, j STRING)") @@ -145,18 +225,6 @@ def test_from_df(self, duck_connector: DuckDBConnector): result = test_df.execute("SELECT * FROM test_table").fetchall() assert result == [(1, "one"), (2, "two"), (3, "three")] - def test_from_arrow(self, duck_connector: DuckDBConnector): - import pyarrow as pa - - connection = duck_connector.get_connection() - test_table = pa.Table.from_pydict( - {"i": [1, 2, 3], "j": ["one", "two", "three"]} - ) - assert test_table.to_pydict() == {"i": [1, 2, 3], "j": ["one", "two", "three"]} - - result = connection.execute("SELECT * FROM test_table").fetchall() - assert result == [(1, "one"), (2, "two"), (3, "three")] - def test_create_function(self, duck_connector: DuckDBConnector): duck_connector.get_connection()