Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg table format support for filesystem destination #2067

Merged
merged 79 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
79c018c
add pyiceberg dependency and upgrade mypy
jorritsandbrink Nov 14, 2024
5014f88
extend pyiceberg dependencies
jorritsandbrink Nov 15, 2024
c632dd7
remove redundant delta annotation
jorritsandbrink Nov 15, 2024
a3f6587
add basic local filesystem iceberg support
jorritsandbrink Nov 15, 2024
87553a6
add active table format setting
jorritsandbrink Nov 15, 2024
513662e
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 15, 2024
10121be
disable merge tests for iceberg table format
jorritsandbrink Nov 15, 2024
23c4db3
restore non-redundant extra info
jorritsandbrink Nov 15, 2024
195ee4c
refactor to in-memory iceberg catalog
jorritsandbrink Nov 15, 2024
ee6e22e
add s3 support for iceberg table format
jorritsandbrink Nov 15, 2024
bc51008
add schema evolution support for iceberg table format
jorritsandbrink Nov 16, 2024
2de58a2
extract _register_table function
jorritsandbrink Nov 16, 2024
dd4ad0f
add partition support for iceberg table format
jorritsandbrink Nov 20, 2024
04be59b
update docstring
jorritsandbrink Nov 20, 2024
42f59c7
enable child table test for iceberg table format
jorritsandbrink Nov 21, 2024
a540135
enable empty source test for iceberg table format
jorritsandbrink Nov 21, 2024
3d1dc63
make iceberg catalog namespace configurable and default to dataset name
jorritsandbrink Nov 22, 2024
59e6d08
add optional typing
jorritsandbrink Nov 22, 2024
71e436d
fix typo
jorritsandbrink Nov 24, 2024
2effa8f
improve typing
jorritsandbrink Nov 24, 2024
8979ee1
extract logic into dedicated function
jorritsandbrink Nov 24, 2024
e956b09
add iceberg read support to filesystem sql client
jorritsandbrink Nov 24, 2024
571bf0c
remove unused import
jorritsandbrink Nov 24, 2024
0ec5fcb
add todo
jorritsandbrink Nov 24, 2024
ab0b9a0
extract logic into separate functions
jorritsandbrink Nov 24, 2024
e149ba6
add azure support for iceberg table format
jorritsandbrink Nov 24, 2024
27b8659
generalize delta table format tests
jorritsandbrink Nov 24, 2024
d39d58d
enable get tables function test for iceberg table format
jorritsandbrink Nov 24, 2024
2f910c2
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 24, 2024
547a37a
remove ignores
jorritsandbrink Nov 25, 2024
53b2d56
undo table directory management change
jorritsandbrink Nov 25, 2024
3ff9fb7
enable test_read_interfaces tests for iceberg
jorritsandbrink Nov 25, 2024
5b0cd17
fix active table format filter
jorritsandbrink Nov 25, 2024
8798d79
use mixin for object store rs credentials
jorritsandbrink Nov 25, 2024
c1cc068
generalize catalog typing
jorritsandbrink Nov 25, 2024
35f590b
extract pyiceberg scheme mapping into separate function
jorritsandbrink Nov 26, 2024
e0d6a1b
generalize credentials mixin test setup
jorritsandbrink Nov 26, 2024
85a10a2
remove unused import
jorritsandbrink Nov 26, 2024
54cd0bc
add centralized fallback to append when merge is not supported
jorritsandbrink Nov 26, 2024
4e979f0
Revert "add centralized fallback to append when merge is not supported"
jorritsandbrink Nov 27, 2024
54f1353
fall back to append if merge is not supported on filesystem
jorritsandbrink Nov 27, 2024
28d0fd2
fix test for s3-compatible storage
jorritsandbrink Nov 27, 2024
90b1729
remove obsolete code path
jorritsandbrink Nov 27, 2024
d0f7c88
exclude gcs read interface tests for iceberg
jorritsandbrink Nov 27, 2024
050bea7
add gcs support for iceberg table format
jorritsandbrink Nov 28, 2024
ff48ca9
switch to UnsupportedAuthenticationMethodException
jorritsandbrink Nov 28, 2024
01e8d26
add iceberg table format docs
jorritsandbrink Nov 28, 2024
ef29aa7
use shorter pipeline name to prevent too long sql identifiers
jorritsandbrink Nov 29, 2024
f463d06
add iceberg catalog note to docs
jorritsandbrink Nov 29, 2024
fcc05ee
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 29, 2024
d50aaa1
black format
jorritsandbrink Nov 29, 2024
6cce03b
use shorter pipeline name to prevent too long sql identifiers
jorritsandbrink Nov 29, 2024
fc61663
correct max id length for sqlalchemy mysql dialect
jorritsandbrink Nov 29, 2024
b011907
Revert "use shorter pipeline name to prevent too long sql identifiers"
jorritsandbrink Nov 29, 2024
e748dcf
Revert "use shorter pipeline name to prevent too long sql identifiers"
jorritsandbrink Nov 29, 2024
1b47893
replace show with execute to prevent useless print output
jorritsandbrink Nov 29, 2024
133f1ce
add abfss scheme to test
jorritsandbrink Nov 30, 2024
eceb19f
remove az support for iceberg table format
jorritsandbrink Nov 30, 2024
e75114d
remove iceberg bucket test exclusion
jorritsandbrink Nov 30, 2024
049c008
add note to docs on azure scheme support for iceberg table format
jorritsandbrink Nov 30, 2024
a0fc017
exclude iceberg from duckdb s3-compatibility test
jorritsandbrink Dec 1, 2024
ba75445
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 1, 2024
de0086e
disable pyiceberg info logs for tests
jorritsandbrink Dec 1, 2024
ca7f655
extend table format docs and move into own page
jorritsandbrink Dec 1, 2024
2ba8fcb
upgrade adlfs to enable account_host attribute
jorritsandbrink Dec 1, 2024
0517a95
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 2, 2024
1c2b9b4
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 2, 2024
872432e
fix lint errors
jorritsandbrink Dec 3, 2024
9c44290
re-add pyiceberg dependency
jorritsandbrink Dec 3, 2024
c129b9e
enabled iceberg in dbt-duckdb
rudolfix Dec 6, 2024
6992d56
upgrade pyiceberg version
jorritsandbrink Dec 10, 2024
156d518
Merge branch 'feat/1996-iceberg-filesystem' of https://github.com/dlt…
jorritsandbrink Dec 10, 2024
aa19f13
remove pyiceberg mypy errors across python version
jorritsandbrink Dec 10, 2024
c07c9f6
Merge branch 'devel' into feat/1996-iceberg-filesystem
rudolfix Dec 10, 2024
b7f6dbf
does not install airflow group for dev
rudolfix Dec 10, 2024
9cad3ec
fixes gcp oauth iceberg credentials handling
rudolfix Dec 10, 2024
346b270
fixes ca cert bundle duckdb azure on ci
rudolfix Dec 10, 2024
08f8ee3
Merge branch 'devel' into feat/1996-iceberg-filesystem
rudolfix Dec 11, 2024
accb62d
allow for airflow dep to be present during type check
rudolfix Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def find_call_arguments_to_replace(
if not isinstance(dn_node, ast.Constant) or not isinstance(dn_node.value, str):
raise CliCommandInnerException(
"init",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as"
f"The pipeline script {init_script_name} must pass the {t_arg_name} as" # type: ignore[attr-defined]
f" string to '{arg_name}' function in line {dn_node.lineno}",
)
else:
Expand Down
13 changes: 12 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithPyicebergConfig
from dlt.common.configuration.specs.exceptions import (
InvalidBoto3Session,
ObjectStoreRsCredentialsException,
Expand All @@ -16,7 +17,7 @@


@configspec
class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
class AwsCredentialsWithoutDefaults(CredentialsConfiguration, WithPyicebergConfig):
# credentials without boto implementation
aws_access_key_id: str = None
aws_secret_access_key: TSecretStrValue = None
Expand Down Expand Up @@ -77,6 +78,16 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:

return creds

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"s3.access-key-id": self.aws_access_key_id,
"s3.secret-access-key": self.aws_secret_access_key,
"s3.session-token": self.aws_session_token,
"s3.region": self.region_name,
"s3.endpoint": self.endpoint_url,
"s3.connect-timeout": 300,
jorritsandbrink marked this conversation as resolved.
Show resolved Hide resolved
}


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]:
def get_resolvable_fields(cls) -> Dict[str, type]:
"""Returns a mapping of fields to their type hints. Dunders should not be resolved and are not returned"""
return {
f.name: eval(f.type) if isinstance(f.type, str) else f.type # type: ignore[arg-type]
f.name: eval(f.type) if isinstance(f.type, str) else f.type
for f in cls._get_resolvable_dataclass_fields()
}

Expand Down
12 changes: 12 additions & 0 deletions dlt/common/configuration/specs/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Dict, Any
from abc import abstractmethod, ABC


class WithPyicebergConfig(ABC):
@abstractmethod
def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
"""Returns `pyiceberg` FileIO configuration dictionary.

https://py.iceberg.apache.org/configuration/#fileio
"""
pass
2 changes: 1 addition & 1 deletion dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
if self.writer_spec.is_binary_format:
self._file = self.open(self._file_name, "wb") # type: ignore
else:
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="") # type: ignore
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="") # type: ignore[unused-ignore]
self._writer = self.writer_cls(self._file, caps=self._caps) # type: ignore[assignment]
self._writer.write_header(self._current_columns)
# write buffer
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/destination/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def verify_schema_capabilities(
exception_log: List[Exception] = []
# combined casing function
case_identifier = lambda ident: capabilities.casefold_identifier(
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident) # type: ignore
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident) # type: ignore[unused-ignore]
)
table_name_lookup: DictStrStr = {}
# name collision explanation
Expand Down
145 changes: 145 additions & 0 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import Dict, Any
import os

from dlt import version, Pipeline
from dlt.common.libs.pyarrow import cast_arrow_schema_types, columns_to_arrow
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.utils import assert_min_pkg_version
from dlt.common.exceptions import MissingDependencyException
from dlt.common.configuration.specs import CredentialsConfiguration
from dlt.common.configuration.specs.mixins import WithPyicebergConfig
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

assert_min_pkg_version(
pkg_name="sqlalchemy",
version="2.0.18",
msg="`sqlalchemy>=2.0.18` is needed for `iceberg` table format on `filesystem` destination.",
)

try:
from pyiceberg.table import Table as IcebergTable
from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt pyiceberg helpers",
[f"{version.DLT_PKG_NAME}[pyiceberg]"],
"Install `pyiceberg` so dlt can create Iceberg tables in the `filesystem` destination.",
)


DLT_ICEBERG_NAMESPACE = "dlt"


def ensure_iceberg_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema:
ARROW_TO_ICEBERG_COMPATIBLE_ARROW_TYPE_MAP = {
pa.types.is_time: pa.string(),
pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128
}
return cast_arrow_schema_types(schema, ARROW_TO_ICEBERG_COMPATIBLE_ARROW_TYPE_MAP)


def ensure_iceberg_compatible_arrow_data(data: pa.Table) -> pa.Table:
schema = ensure_iceberg_compatible_arrow_schema(data.schema)
return data.cast(schema)


def write_iceberg_table(
table: IcebergTable,
data: pa.Table,
write_disposition: TWriteDisposition,
) -> None:
if write_disposition == "append":
table.append(ensure_iceberg_compatible_arrow_data(data))
elif write_disposition == "replace":
table.overwrite(ensure_iceberg_compatible_arrow_data(data))


def get_catalog(
client: FilesystemClient,
table_name: str,
schema: pa.Schema = None,
jorritsandbrink marked this conversation as resolved.
Show resolved Hide resolved
) -> SqlCatalog:
"""Returns single-table, ephemeral, in-memory Iceberg catalog."""

# create in-memory catalog
catalog = SqlCatalog(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: how we get a catalog should be some kind of plugin. so we can easily plug glue or rest to filesystem

"default",
uri="sqlite:///:memory:",
**_get_fileio_config(client.config.credentials),
)
catalog.create_namespace(DLT_ICEBERG_NAMESPACE)

# add table to catalog
table_id = f"{DLT_ICEBERG_NAMESPACE}.{table_name}"
table_path = f"{client.dataset_path}/{table_name}"
metadata_path = f"{table_path}/metadata"
if client.fs_client.exists(metadata_path):
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# found metadata; register existing table
table = _register_table(table_id, metadata_path, catalog, client)

# evolve schema
if schema is not None:
with table.update_schema() as update:
update.union_by_name(ensure_iceberg_compatible_arrow_schema(schema))
else:
# found no metadata; create new table
assert schema is not None
catalog.create_table(
table_id,
schema=ensure_iceberg_compatible_arrow_schema(schema),
location=client.make_remote_url(table_path),
)

return catalog


def get_iceberg_tables(
jorritsandbrink marked this conversation as resolved.
Show resolved Hide resolved
pipeline: Pipeline, *tables: str, schema_name: str = None
) -> Dict[str, IcebergTable]:
from dlt.common.schema.utils import get_table_format

with pipeline.destination_client(schema_name=schema_name) as client:
assert isinstance(
client, FilesystemClient
), "The `get_iceberg_tables` function requires a `filesystem` destination."

schema_iceberg_tables = [
t["name"]
for t in client.schema.tables.values()
if get_table_format(client.schema.tables, t["name"]) == "iceberg"
]
if len(tables) > 0:
invalid_tables = set(tables) - set(schema_iceberg_tables)
if len(invalid_tables) > 0:
available_schemas = ""
if len(pipeline.schema_names) > 1:
available_schemas = f" Available schemas are {pipeline.schema_names}"
raise ValueError(
f"Schema {client.schema.name} does not contain Iceberg tables with these names:"
f" {', '.join(invalid_tables)}.{available_schemas}"
)
schema_iceberg_tables = [t for t in schema_iceberg_tables if t in tables]

return {
name: get_catalog(client, name).load_table(f"{DLT_ICEBERG_NAMESPACE}.{name}")
for name in schema_iceberg_tables
}


def _get_fileio_config(credentials: CredentialsConfiguration) -> Dict[str, Any]:
if isinstance(credentials, WithPyicebergConfig):
return credentials.to_pyiceberg_fileio_config()
return {}


def _register_table(
identifier: str,
metadata_path: str,
catalog: SqlCatalog,
client: FilesystemClient,
) -> IcebergTable:
# TODO: implement faster way to obtain `last_metadata_file` (listing is slow)
metadata_files = [f for f in client.fs_client.ls(metadata_path) if f.endswith(".json")]
last_metadata_file = client.make_remote_url(sorted(metadata_files)[-1])
return catalog.register_table(identifier, last_metadata_file)
2 changes: 1 addition & 1 deletion dlt/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def is_logging() -> bool:
def log_level() -> str:
if not LOGGER:
raise RuntimeError("Logger not initialized")
return logging.getLevelName(LOGGER.level) # type: ignore
return logging.getLevelName(LOGGER.level)


def is_json_logging(log_format: str) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class DataWriterMetrics(NamedTuple):
created: float
last_modified: float

def __add__(self, other: Tuple[object, ...], /) -> Tuple[object, ...]:
def __add__(self, other: Tuple[object, ...], /) -> Tuple[object, ...]: # type: ignore[override]
if isinstance(other, DataWriterMetrics):
return DataWriterMetrics(
self.file_path if self.file_path == other.file_path else "",
Expand Down
14 changes: 7 additions & 7 deletions dlt/common/reflection/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,24 @@ def rewrite_python_script(
last_line = -1
last_offset = -1
# sort transformed nodes by line and offset
for node, t_value in sorted(transformed_nodes, key=lambda n: (n[0].lineno, n[0].col_offset)):
for node, t_value in sorted(transformed_nodes, key=lambda n: (n[0].lineno, n[0].col_offset)): # type: ignore[attr-defined]
# do we have a line changed
if last_line != node.lineno - 1:
if last_line != node.lineno - 1: # type: ignore[attr-defined]
# add remainder from the previous line
if last_offset >= 0:
script_lines.append(source_script_lines[last_line][last_offset:])
# add all new lines from previous line to current
script_lines.extend(source_script_lines[last_line + 1 : node.lineno - 1])
script_lines.extend(source_script_lines[last_line + 1 : node.lineno - 1]) # type: ignore[attr-defined]
# add trailing characters until node in current line starts
script_lines.append(source_script_lines[node.lineno - 1][: node.col_offset])
script_lines.append(source_script_lines[node.lineno - 1][: node.col_offset]) # type: ignore[attr-defined]
elif last_offset >= 0:
# no line change, add the characters from the end of previous node to the current
script_lines.append(source_script_lines[last_line][last_offset : node.col_offset])
script_lines.append(source_script_lines[last_line][last_offset : node.col_offset]) # type: ignore[attr-defined]

# replace node value
script_lines.append(astunparse.unparse(t_value).strip())
last_line = node.end_lineno - 1
last_offset = node.end_col_offset
last_line = node.end_lineno - 1 # type: ignore[attr-defined]
last_offset = node.end_col_offset # type: ignore[attr-defined]

# add all that was missing
if last_offset >= 0:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def get_new_table_columns(
Typically they come from the destination schema. Columns that are in `existing_columns` and not in `table_name` columns are ignored.

Optionally includes incomplete columns (without data type)"""
casefold_f: Callable[[str], str] = str.casefold if not case_sensitive else str # type: ignore[assignment]
casefold_f: Callable[[str], str] = str.casefold if not case_sensitive else str
casefold_existing = {
casefold_f(col_name): col for col_name, col in existing_columns.items()
}
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_generic_type_argument_from_instance(
if cls_:
orig_param_type = get_args(cls_)[0]
if orig_param_type in (Any, CallableAny) and sample_value is not None:
orig_param_type = type(sample_value)
orig_param_type = type(sample_value) # type: ignore[assignment]
return orig_param_type # type: ignore


Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def filesystem_loader_file_format_selector(
*,
table_schema: TTableSchema,
) -> t.Tuple[TLoaderFileFormat, t.Sequence[TLoaderFileFormat]]:
if table_schema.get("table_format") == "delta":
if table_schema.get("table_format") in ("delta", "iceberg"):
return ("parquet", ["parquet"])
return (preferred_loader_file_format, supported_loader_file_formats)

Expand All @@ -43,7 +43,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext.generic_capabilities(
preferred_loader_file_format="jsonl",
loader_file_format_selector=filesystem_loader_file_format_selector,
supported_table_formats=["delta"],
supported_table_formats=["delta", "iceberg"],
supported_merge_strategies=["upsert"],
merge_strategies_selector=filesystem_merge_strategies_selector,
)
Expand Down
Loading
Loading