Skip to content

Commit

Permalink
removed from_arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano committed Dec 27, 2024
1 parent 5b00d93 commit 210991b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 31 deletions.
1 change: 1 addition & 0 deletions .github/workflows/nightly-dev-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
fail-fast: false
steps:
- uses: actions/checkout@v3
Expand Down
17 changes: 0 additions & 17 deletions prefect_duckdb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import duckdb
import pandas
import pyarrow as pa
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from prefect import get_client, task
from prefect.artifacts import create_markdown_artifact
Expand Down Expand Up @@ -548,22 +547,6 @@ async def from_df(
await run_sync_in_worker_thread(cursor.register, table_name, table)
return cursor

@sync_compatible
async def from_arrow(
self, arrow_object: pa.Table, table_name: Optional[str] = None
) -> DuckDBPyRelation:
"""
Create a table from an Arrow object.
Args:
arrow_object: The Arrow object.
"""
cursor = self._connection.cursor()
table = arrow_object
if table_name:
await run_sync_in_worker_thread(cursor.register, table_name, table)
return cursor

@sync_compatible
async def from_parquet(
self,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ mypy-extensions==1.0.0
# mypy
nodeenv==1.9.1
# via pre-commit
numpy==2.2.1
numpy>=2.0.2
# via pandas
oauthlib==3.2.2
# via requests-oauthlib
Expand Down
94 changes: 81 additions & 13 deletions tests/test_database.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import re

import pytest
from duckdb import DuckDBPyConnection
from prefect import flow
from prefect.server.schemas.actions import ArtifactCreate

from prefect.server import schemas
from prefect.server.schemas.actions import ArtifactCreate
import pydantic
from prefect_duckdb.database import DuckDBConnector, duckdb_query

qplan = """
Expand Down Expand Up @@ -82,6 +86,82 @@ def test_execute(self, duck_connector: DuckDBConnector):
cursor = duck_connector.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
assert type(cursor) is DuckDBPyConnection

async def test_execute_debug(
self, duck_connector: DuckDBConnector, caplog, client, artifact
):
with duck_connector.get_connection():
await duck_connector.execute(
"CREATE TABLE students (name VARCHAR, sid INTEGER);"
)
await duck_connector.execute(
"CREATE TABLE exams (eid INTEGER, subject VARCHAR, sid INTEGER);"
)
await duck_connector.execute(
"INSERT INTO students VALUES ('Mark', 1), ('Joe', 2), ('Matthew', 3);"
)
await duck_connector.execute(
"INSERT INTO exams VALUES \n"
"(10, 'Physics', 1), (20, 'Chemistry', 2), (30, 'Literature', 3);"
)

operation = (
"SELECT name FROM students JOIN exams USING (sid) WHERE name LIKE 'Ma%'"
)
await duck_connector.execute(
operation,
debug=True,
)

artifact_key = (
re.sub(
"[^A-Za-z0-9 ]+",
"",
operation,
)
.lower()
.replace(" ", "-")
)
response = await client.get(f"/artifacts/{artifact_key}/latest")
result = pydantic.parse_obj_as(schemas.core.Artifact, response.json())
assert "Physical_Plan" in result.data

async def test_set_debug(
self, duck_connector: DuckDBConnector, caplog, client, artifact
):
with duck_connector.get_connection():
await duck_connector.execute(
"CREATE TABLE students (name VARCHAR, sid INTEGER);"
)
await duck_connector.execute(
"CREATE TABLE exams (eid INTEGER, subject VARCHAR, sid INTEGER);"
)
await duck_connector.execute(
"INSERT INTO students VALUES ('Mark', 1), ('Joe', 2), ('Matthew', 3);"
)
await duck_connector.execute(
"INSERT INTO exams VALUES \n"
"(10, 'Physics', 1), (20, 'Chemistry', 2), (30, 'Literature', 3);"
)

operation = (
"SELECT name FROM students JOIN exams USING (sid) WHERE name LIKE 'Ma%'"
)
duck_connector.set_debug(True)
await duck_connector.execute(operation)

artifact_key = (
re.sub(
"[^A-Za-z0-9 ]+",
"",
operation,
)
.lower()
.replace(" ", "-")
)
response = await client.get(f"/artifacts/{artifact_key}/latest")
result = pydantic.parse_obj_as(schemas.core.Artifact, response.json())
assert "Physical_Plan" in result.data

def test_fetch_one(self, duck_connector: DuckDBConnector):
duck_connector.get_connection()
cursor = duck_connector.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
Expand Down Expand Up @@ -145,18 +225,6 @@ def test_from_df(self, duck_connector: DuckDBConnector):
result = test_df.execute("SELECT * FROM test_table").fetchall()
assert result == [(1, "one"), (2, "two"), (3, "three")]

def test_from_arrow(self, duck_connector: DuckDBConnector):
import pyarrow as pa

connection = duck_connector.get_connection()
test_table = pa.Table.from_pydict(
{"i": [1, 2, 3], "j": ["one", "two", "three"]}
)
assert test_table.to_pydict() == {"i": [1, 2, 3], "j": ["one", "two", "three"]}

result = connection.execute("SELECT * FROM test_table").fetchall()
assert result == [(1, "one"), (2, "two"), (3, "three")]

def test_create_function(self, duck_connector: DuckDBConnector):
duck_connector.get_connection()

Expand Down

0 comments on commit 210991b

Please sign in to comment.