Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] enables python 3.13 #2047

Draft
wants to merge 4 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ jobs:
os: "ubuntu-latest"
- python-version: "3.12.x"
os: "ubuntu-latest"
- python-version: "3.13.x"
os: "ubuntu-latest"
- python-version: "3.12.x"
os: "windows-latest"

defaults:
run:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ has-poetry:
dev: has-poetry
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk,airflow

dev-common: has-poetry
poetry install -E duckdb -E cli -E parquet -E deltalake -E sql_database --with sentry-sdk,pipeline,sources

lint:
./tools/check-package.sh
poetry run python ./tools/check-lockfile.py
Expand Down
13 changes: 6 additions & 7 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from yaml import Dumper
from itertools import chain
from typing import List, Optional, Sequence, Tuple, Any, Dict
from astunparse import unparse

# optional dependencies
import pipdeptree
Expand All @@ -23,7 +22,7 @@
from dlt.common.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.runtime_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.common.reflection.utils import evaluate_node_literal, ast_unparse
from dlt.common.pipeline import LoadInfo, TPipelineState, get_dlt_repos_dir
from dlt.common.storages import FileStorage
from dlt.common.utils import set_working_dir
Expand Down Expand Up @@ -313,7 +312,7 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if f_r_value is None:
fmt.warning(
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
f" determined from {ast_unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
Expand All @@ -331,8 +330,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipelines_dir' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipelines-dir option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipelines-dir option.",
)

p_n_node = call_args.arguments.get("pipeline_name")
Expand All @@ -342,8 +341,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipeline_name' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipeline-name option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipeline-name option.",
)
pipelines.append((pipeline_name, pipelines_dir))

Expand Down
5 changes: 2 additions & 3 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import ast
import inspect
from astunparse import unparse
from typing import Dict, Tuple, Set, List

from dlt.common.configuration import is_secret_hint
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.reflection.utils import creates_func_def_name_node
from dlt.common.reflection.utils import creates_func_def_name_node, ast_unparse
from dlt.common.typing import is_optional_type

from dlt.sources import SourceReference
Expand Down Expand Up @@ -65,7 +64,7 @@ def find_source_calls_to_replace(
for calls in visitor.known_sources_resources_calls.values():
for call in calls:
transformed_nodes.append(
(call.func, ast.Name(id=pipeline_name + "_" + unparse(call.func)))
(call.func, ast.Name(id=pipeline_name + "_" + ast_unparse(call.func)))
)

return transformed_nodes
Expand Down
15 changes: 15 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ def get_pyarrow_int(precision: Optional[int]) -> Any:
return pyarrow.int64()


# def minimal_integer_type(max_value: int, signed: bool = True) -> pyarrow.DataType:
# num_bits = max_value.bit_length() or 1 # Ensure at least 1 bit
# if signed:
# num_bits += 1 # Add 1 bit for the sign

# # Standard bit widths for integer types
# bit_widths = [8, 16, 32, 64]

# # Find the minimal bit width that can accommodate num_bits
# bit_width = next((bw for bw in bit_widths if num_bits <= bw), 64)

# # Create the integer type using pa.int_()
# return pyarrow.int_(bit_width, signed=signed)


def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
"""Returns (data_type, precision, scale) tuple from pyarrow.DataType"""
if pyarrow.types.is_string(dtype) or pyarrow.types.is_large_string(dtype):
Expand Down
15 changes: 11 additions & 4 deletions dlt/common/reflection/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import ast
import inspect
import astunparse
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, Callable

try:
import astunparse

ast_unparse: Callable[[ast.AST], str] = astunparse.unparse
except ImportError:
ast_unparse = ast.unparse # type: ignore[attr-defined, unused-ignore]


from dlt.common.typing import AnyFun

Expand All @@ -25,7 +32,7 @@ def get_literal_defaults(node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) ->
literal_defaults: Dict[str, str] = {}
for arg, default in zip(reversed(args), reversed(defaults)):
if default:
literal_defaults[str(arg.arg)] = astunparse.unparse(default).strip()
literal_defaults[str(arg.arg)] = ast_unparse(default).strip()

return literal_defaults

Expand Down Expand Up @@ -99,7 +106,7 @@ def rewrite_python_script(
script_lines.append(source_script_lines[last_line][last_offset : node.col_offset])

# replace node value
script_lines.append(astunparse.unparse(t_value).strip())
script_lines.append(ast_unparse(t_value).strip())
last_line = node.end_lineno - 1
last_offset = node.end_col_offset

Expand Down
2 changes: 1 addition & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def is_typeddict(t: Type[Any]) -> bool:

def is_annotated(ann_type: Any) -> bool:
try:
return issubclass(get_origin(ann_type), Annotated) # type: ignore[arg-type]
return get_origin(ann_type) is Annotated
except TypeError:
return False

Expand Down
7 changes: 3 additions & 4 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
from dlt.common.schema.typing import TColumnType
from dlt.common.storages import FilesystemConfiguration, fsspec_from_config


from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.exceptions import LoadJobTerminalException
from dlt.destinations.impl.databricks.configuration import DatabricksClientConfiguration
from dlt.destinations.impl.databricks.sql_client import DatabricksSqlClient
from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset
from dlt.destinations.sql_jobs import SqlMergeFollowupJob
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.utils import is_compression_disabled
Expand Down Expand Up @@ -209,7 +208,7 @@ def gen_delete_from_sql(
"""


class DatabricksClient(InsertValuesJobClient, SupportsStagingDestination):
class DatabricksClient(SqlJobClientWithStagingDataset, SupportsStagingDestination):
def __init__(
self,
schema: Schema,
Expand All @@ -224,7 +223,7 @@ def __init__(
)
super().__init__(schema, config, sql_client)
self.config: DatabricksClientConfiguration = config
self.sql_client: DatabricksSqlClient = sql_client
self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment, unused-ignore]
self.type_mapper = self.capabilities.get_type_mapper()

def create_load_job(
Expand Down
8 changes: 4 additions & 4 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
class DatabricksCursorImpl(DBApiCursorImpl):
"""Use native data frame support if available"""

native_cursor: DatabricksSqlCursor
native_cursor: DatabricksSqlCursor # type: ignore[assignment, unused-ignore]
vector_size: ClassVar[int] = 2048 # vector size is 2048

def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]:
Expand Down Expand Up @@ -124,7 +124,7 @@ def execute_sql(
@contextmanager
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
curr: DBApiCursor
# curr: DBApiCursor
# TODO: Inline param support will be dropped in future databricks driver, switch to :named paramstyle
# This will drop support for cluster runtime v13.x
# db_args: Optional[Dict[str, Any]]
Expand All @@ -142,11 +142,11 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
# db_args[key] = db_arg
# else:
# db_args = kwargs or None

assert isinstance(query, str)
db_args = args or kwargs or None
with self._conn.cursor() as curr:
curr.execute(query, db_args)
yield DatabricksCursorImpl(curr) # type: ignore[abstract]
yield DatabricksCursorImpl(curr) # type: ignore[arg-type, abstract, unused-ignore]

def catalog_name(self, escape: bool = True) -> Optional[str]:
catalog = self.capabilities.casefold_identifier(self.credentials.catalog)
Expand Down
9 changes: 7 additions & 2 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

try:
from dlt.common.libs import pyarrow
from dlt.common.libs.numpy import numpy
from dlt.common.libs.pyarrow import pyarrow as pa, TAnyArrowItem
from dlt.common.libs.pyarrow import from_arrow_scalar, to_arrow_scalar
except MissingDependencyException:
pa = None
pyarrow = None

try:
from dlt.common.libs.numpy import numpy
except MissingDependencyException:
numpy = None

# NOTE: always import pandas independently from pyarrow
Expand Down Expand Up @@ -285,7 +288,9 @@ def _add_unique_index(self, tbl: "pa.Table") -> "pa.Table":
"""Creates unique index if necessary."""
# create unique index if necessary
if self._dlt_index not in tbl.schema.names:
tbl = pyarrow.append_column(tbl, self._dlt_index, pa.array(numpy.arange(tbl.num_rows)))
# indices = pa.compute.sequence(start=0, step=1, length=tbl.num_rows, dtype=pyarrow.minimal_integer_type(tbl.num_rows-1))
indices = pa.array(range(tbl.num_rows))
tbl = pyarrow.append_column(tbl, self._dlt_index, indices)
return tbl

def __call__(
Expand Down
9 changes: 4 additions & 5 deletions dlt/reflection/script_visitor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import inspect
import ast
import astunparse
from ast import NodeVisitor
from typing import Any, Dict, List
from dlt.common.reflection.utils import find_outer_func_def

from dlt.common.reflection.utils import find_outer_func_def, ast_unparse

import dlt.reflection.names as n

Expand Down Expand Up @@ -68,9 +67,9 @@ def visit_FunctionDef(self, node: ast.FunctionDef) -> Any:
for deco in node.decorator_list:
# decorators can be function calls, attributes or names
if isinstance(deco, (ast.Name, ast.Attribute)):
alias_name = astunparse.unparse(deco).strip()
alias_name = ast_unparse(deco).strip()
elif isinstance(deco, ast.Call):
alias_name = astunparse.unparse(deco.func).strip()
alias_name = ast_unparse(deco.func).strip()
else:
raise ValueError(
self.source_segment(deco), type(deco), "Unknown decorator form"
Expand All @@ -87,7 +86,7 @@ def visit_FunctionDef(self, node: ast.FunctionDef) -> Any:
def visit_Call(self, node: ast.Call) -> Any:
if self._curr_pass == 2:
# check if this is a call to any of known functions
alias_name = astunparse.unparse(node.func).strip()
alias_name = ast_unparse(node.func).strip()
fn = self.func_aliases.get(alias_name)
if not fn:
# try a fallback to "run" function that may be called on pipeline or source
Expand Down
14 changes: 13 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ ignore_missing_imports=true
[mypy-pandas.*]
ignore_missing_imports=true

[mypy-numpy.*]
ignore_missing_imports=true

[mypy-apiclient.*]
ignore_missing_imports=true

Expand Down Expand Up @@ -119,4 +122,13 @@ ignore_missing_imports = True
ignore_missing_imports = True

[mypy-pytz.*]
ignore_missing_imports = True
ignore_missing_imports = True

[mypy-sentry_sdk.*]
ignore_missing_imports = True

[mypy-backports.*]
ignore_missing_imports = True

[mypy-pendulum.*]
ignore_missing_imports = True
Loading
Loading