Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Iceberg cache #355

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
73f9289
add back iceberg file
aaronsteers Jul 21, 2024
d26f29e
updates!
aaronsteers Jul 21, 2024
ca37196
add polars schema utils
aaronsteers Aug 2, 2024
07ac6a5
`poetry add polars`
aaronsteers Aug 2, 2024
80e0fdb
add polars type mapping
aaronsteers Aug 2, 2024
8d9ef08
add as_filelike() for iterator
aaronsteers Aug 2, 2024
015faf0
lint fix
aaronsteers Aug 2, 2024
4fb9b3b
wip: iceberg updates, w example script
aaronsteers Aug 2, 2024
bddbc59
custom pydantic classes with str data
aaronsteers Aug 3, 2024
9af597f
refactor to parse stdout in separate method
aaronsteers Aug 3, 2024
46ba395
make flush_active_batch public
aaronsteers Aug 4, 2024
e99e72f
cleanup
aaronsteers Aug 4, 2024
4d163a0
add icerberg processor methods
aaronsteers Aug 4, 2024
6337bd2
update parquet writer
aaronsteers Aug 4, 2024
852a4b8
remove dupe implementation
aaronsteers Aug 4, 2024
049b38a
refactor: remove unnecessary RecordProcessor class
aaronsteers Aug 4, 2024
a671c97
refactor: add AirbyteWritersInterface and WriteMethod, plus related r…
aaronsteers Aug 5, 2024
b06485a
Merge branch 'refactor/writers-write-not-sources' into feat/iceberg-h…
aaronsteers Aug 5, 2024
34acdda
fix circular import loop
aaronsteers Aug 5, 2024
41f14e1
Merge branch 'refactor/writers-write-not-sources' into feat/iceberg-h…
aaronsteers Aug 5, 2024
ed8edec
fix parquet writer import
aaronsteers Aug 5, 2024
a5efbd1
updated polars test script
aaronsteers Aug 7, 2024
c3d4e9e
poetry add --dev boto3
aaronsteers Aug 7, 2024
c6969de
gitignore test-artifact files
aaronsteers Aug 7, 2024
63b18ef
update transforms
aaronsteers Aug 7, 2024
8f82437
Merge remote-tracking branch 'origin/main' into feat/iceberg-hack-day
aaronsteers Sep 2, 2024
de810e8
Merge remote-tracking branch 'origin/main' into feat/iceberg-hack-day
aaronsteers Sep 2, 2024
9abcea4
remove dupe file
aaronsteers Sep 2, 2024
80b7340
remove dupe import
aaronsteers Sep 2, 2024
30d1afb
remove import
aaronsteers Sep 2, 2024
54c9752
remove extra imports
aaronsteers Sep 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# perf-test
*test-artifact*

# temp files
temp
.temp
Expand Down
61 changes: 61 additions & 0 deletions airbyte/_airbyte_message_overrides.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Custom Airbyte message classes.

These classes override the default handling, in order to ensure that the data field is always a
jsonified string, rather than a dict.

To use these classes, import them from this module, and use them in place of the default classes.

Example:
```python
from airbyte._airbyte_message_overrides import AirbyteMessageWithStrData

for line in sys.stdin:
message = AirbyteMessageWithStrData.model_validate_json(line)
```
"""

from __future__ import annotations

import copy
import json
from typing import Any

from pydantic import BaseModel, Field, model_validator

from airbyte_protocol.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
)


AirbyteRecordMessageWithStrData = copy.deepcopy(AirbyteRecordMessage)
AirbyteStateMessageWithStrData = copy.deepcopy(AirbyteStateMessage)
AirbyteMessageWithStrData = copy.deepcopy(AirbyteMessage)

# Modify the data field in the copied class
AirbyteRecordMessageWithStrData.__annotations__["data"] = str
AirbyteStateMessageWithStrData.__annotations__["data"] = str

AirbyteRecordMessageWithStrData.data = Field(..., description="jsonified record data as a str")
AirbyteStateMessageWithStrData.data = Field(..., description="jsonified state data as a str")
Comment on lines +33 to +42
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using inheritance for custom classes

The current implementation using deep copies works well, but have you considered using inheritance instead? It might lead to a more maintainable and cleaner code structure. Something like:

class AirbyteRecordMessageWithStrData(AirbyteRecordMessage):
    data: str = Field(..., description="jsonified record data as a str")

class AirbyteStateMessageWithStrData(AirbyteStateMessage):
    data: str = Field(..., description="jsonified state data as a str")

class AirbyteMessageWithStrData(AirbyteMessage):
    record: AirbyteRecordMessageWithStrData | None
    state: AirbyteStateMessageWithStrData | None

This approach could potentially simplify the code and make it easier to manage in the future. What do you think? Does this align with your design goals?



# Add a validator to ensure data is a JSON string
@model_validator(mode="before")
def ensure_data_is_string(
cls: BaseModel, # type: ignore # noqa: ARG001, PGH003
values: dict[str, Any],
) -> None:
if "data" in values and not isinstance(values["data"], dict):
values["data"] = json.dumps(values["data"])
if "data" in values and not isinstance(values["data"], str):
raise ValueError


AirbyteRecordMessageWithStrData.ensure_data_is_string = classmethod(ensure_data_is_string) # type: ignore [arg-type]
AirbyteStateMessageWithStrData.ensure_data_is_string = classmethod(ensure_data_is_string) # type: ignore [arg-type]

AirbyteMessageWithStrData.__annotations__["record"] = AirbyteRecordMessageWithStrData | None
AirbyteMessageWithStrData.__annotations__["state"] = AirbyteStateMessageWithStrData | None
35 changes: 31 additions & 4 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
"""
if force_refresh or self._spec is None:
try:
for msg in self._execute(["spec"]):
for msg in self._execute_and_parse(["spec"]):
if msg.type == Type.SPEC and msg.spec:
self._spec = msg.spec
break
Expand Down Expand Up @@ -275,7 +275,7 @@ def check(self) -> None:
"""
with as_temp_files([self._config]) as [config_file]:
try:
for msg in self._execute(["check", "--config", config_file]):
for msg in self._execute_and_parse(["check", "--config", config_file]):
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
rich.print(f"Connection check succeeded for `{self.name}`.")
Expand Down Expand Up @@ -349,7 +349,7 @@ def _peek_airbyte_message(
)
return

def _execute(
def _execute_and_parse(
self,
args: list[str],
stdin: IO[str] | AirbyteMessageIterator | None = None,
Expand All @@ -371,7 +371,7 @@ def _execute(
self.executor.ensure_installation(auto_fix=False)

try:
for line in self.executor.execute(args, stdin=stdin):
for line in self._execute(args, stdin=stdin):
try:
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
if progress_tracker and message.record:
Expand Down Expand Up @@ -403,6 +403,33 @@ def _execute(
original_exception=e,
) from None

def _execute(
self,
args: list[str],
stdin: IO[str] | AirbyteMessageIterator | None = None,
) -> Generator[str, None, None]:
"""Execute the connector with the given arguments.

This involves the following steps:
* Locate the right venv. It is called ".venv-<connector_name>"
* Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args>
* Read the output line by line of the subprocess and yield (unparsed) strings.

Raises:
AirbyteConnectorFailedError: If the process returns a failure status (non-zero).
"""
# Fail early if the connector is not installed.
self.executor.ensure_installation(auto_fix=False)

try:
yield from self.executor.execute(args, stdin=stdin)

except Exception as e:
raise exc.AirbyteConnectorFailedError(
connector_name=self.name,
log_text=self._last_log_messages,
) from e


__all__ = [
"ConnectorBase",
Expand Down
27 changes: 26 additions & 1 deletion airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from __future__ import annotations

import datetime
import io
import sys
from collections.abc import Iterator
from typing import IO, TYPE_CHECKING, cast

import pydantic
from typing_extensions import final
from typing_extensions import Literal, final

from airbyte_protocol.models import (
AirbyteMessage,
Expand Down Expand Up @@ -57,6 +58,30 @@ def read(self) -> str:
"""Read the next message from the iterator."""
return next(self).model_dump_json()

def as_filelike(self) -> io.BytesIO:
"""Return a file-like object that reads from the iterator."""

class FileLikeReader(io.RawIOBase):
def __init__(self, iterator: Iterator[AirbyteMessage]) -> None:
self.iterator = (msg.model_dump_json() for msg in iterator)
self.buffer = ""

def readable(self) -> Literal[True]:
return True

def readinto(self, b: Any) -> int:
try:
chunk = next(self.iterator)
except StopIteration:
return 0 # EOF

data = chunk.encode()
n = len(data)
b[:n] = data
return n

return cast(io.BytesIO, FileLikeReader(self._iterator))
Comment on lines +61 to +83
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting implementation of as_filelike. A few thoughts:

  1. The FileLikeReader class is a nice implementation of a file-like object. Good job!
  2. In the readinto method, you're using Any for the type of b. Would it be more precise to use bytearray or memoryview? This would also address the Ruff F821 warning.
  3. Have you considered adding a docstring to the FileLikeReader class to explain its purpose?

What are your thoughts on these suggestions? Do you think they would improve the code?

Here's a potential improvement for the readinto method:

-            def readinto(self, b: Any) -> int:
+            def readinto(self, b: bytearray) -> int:

And maybe add a docstring like this:

class FileLikeReader(io.RawIOBase):
    """A file-like object that reads AirbyteMessages from an iterator."""

wdyt?

Tools
Ruff

72-72: Undefined name Any

(F821)


@classmethod
def from_read_result(cls, read_result: ReadResult) -> AirbyteMessageIterator:
"""Create a iterator from a `ReadResult` object."""
Expand Down
7 changes: 1 addition & 6 deletions airbyte/_processors/sql/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,7 @@ def get_sql_engine(self) -> Engine:


class DuckDBSqlProcessor(SqlProcessorBase):
"""A DuckDB implementation of the cache.

Jsonl is used for local file storage before bulk loading.
Unlike the Snowflake implementation, we can't use the COPY command to load data
so we insert as values instead.
"""
"""A DuckDB implementation of the cache."""

supports_merge_insert = False
file_writer_class = JsonlWriter
Expand Down
138 changes: 138 additions & 0 deletions airbyte/_processors/sql/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

from textwrap import dedent, indent
from typing import TYPE_CHECKING

from airbyte_protocol.models import (
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStateType,
)

from airbyte import exceptions as exc
from airbyte._future_cdk.sql_processor import SqlConfig, SqlProcessorBase
from airbyte._future_cdk.state_writers import StateWriterBase
from airbyte._processors.file.parquet import ParquetWriter


if TYPE_CHECKING:
from pathlib import Path


class IcebergConfig(SqlConfig):
"""A Iceberg configuration."""

def __init__(self, db_path: str, schema_name: str) -> None:
"""Initialize the Iceberg configuration."""
self.db_path = db_path
self.schema_name = schema_name


class IcebergSqlProcessor(SqlProcessorBase):
"""A Iceberg SQL processor."""

supports_merge_insert = False
file_writer_class = ParquetWriter
sql_config: IcebergConfig

class IcebergStateWriter(StateWriterBase):
"""A state writer for the Parquet cache."""

def __init__(self, iceberg_processor: IcebergSqlProcessor) -> None:
self._iceberg_processor = iceberg_processor
super().__init__()

def _write_state(self, state: AirbyteRecordMessage) -> None:
"""Write the state to the cache."""
self._iceberg_processor.write_state(state)
Comment on lines +39 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for IcebergStateWriter and private method access

The IcebergStateWriter class looks good overall! However, I noticed that in the write_state method of IcebergSqlProcessor, we're accessing the private method _write_state_to_file of the file writer (line 81).

To improve encapsulation, what do you think about adding a public method to the file writer class that internally calls _write_state_to_file? This way, we can avoid directly accessing private methods from outside the class.

For example:

# In the file writer class
def write_state(self, state):
    self._write_state_to_file(state)

# In IcebergSqlProcessor.write_state
self.file_writer.write_state(state)

Does this approach make sense to you?

Also applies to: 81-81


@property
def get_state_writer(self) -> StateWriterBase:
if self._state_writer is None:
self._state_writer = self.IcebergStateWriter(self)

return self._state_writer

def write_state(self, state: AirbyteStateMessage) -> None:
"""Write the state to the cache.

Args:
state (AirbyteStateMessage): The state to write.

Implementation:
- State messages are written a separate file.
- Any pending records are written to the cache file and the cache file is closed.
- For stream state messages, the matching stream batches are flushed and closed.
- For global state, all batches are flushed and closed.
"""
stream_names: list[str] = []
if state.type == AirbyteStateType.STREAM:
stream_names = [state.record.stream]
if state.type == AirbyteStateType.GLOBAL:
stream_names = list(self._buffered_records.keys())
else:
msg = f"Unexpected state type: {state.type}"
raise exc.PyAirbyteInternalError(msg)

for stream_name in stream_names:
state_file_name = self.file_writer.get_active_batch(stream_name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused variable state_file_name

It looks like the state_file_name variable on line 79 is assigned but never used. Should we remove this assignment, or is it intended for future use? If it's for future use, we might want to add a TODO comment explaining its purpose.

Tools
Ruff

79-79: Local variable state_file_name is assigned to but never used

Remove assignment to unused variable state_file_name

(F841)

self.file_writer.flush_active_batch(stream_name)
self.file_writer._write_state_to_file(state)
return
Comment on lines +57 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for write_state method

The implementation looks good, but I have a couple of suggestions:

  1. The if condition for AirbyteStateType.GLOBAL (line 72) should probably be an elif to match the structure of the other conditions.
  2. The else block (lines 74-76) seems to be misplaced. It will only be reached if the state type is neither STREAM nor GLOBAL. Should this be an elif for a specific state type?

What do you think about these changes? They might make the logic a bit clearer.

Also, the return statement on line 82 will exit the function after processing only the first stream. Is this intentional, or should we process all streams in stream_names?

Tools
Ruff

79-79: Local variable state_file_name is assigned to but never used

Remove assignment to unused variable state_file_name

(F841)


81-81: Private member accessed: _write_state_to_file

(SLF001)


def _write_files_to_new_table(
self,
files: list[Path],
stream_name: str,
batch_id: str,
) -> str:
"""Write file(s) to a new table.

This involves registering the table in the Iceberg catalog, creating a manifest file,
and registering the manifest file in the catalog.
"""
temp_table_name = self._create_table_for_loading(
stream_name=stream_name,
batch_id=batch_id,
)
columns_list = list(self._get_sql_column_definitions(stream_name=stream_name).keys())
columns_list_str = indent(
"\n, ".join([self._quote_identifier(col) for col in columns_list]),
" ",
)
files_list = ", ".join([f"'{f!s}'" for f in files])
columns_type_map = indent(
"\n, ".join(
[
self._quote_identifier(self.normalizer.normalize(prop_name))
+ ': "'
+ str(
self._get_sql_column_definitions(stream_name)[
self.normalizer.normalize(prop_name)
]
)
+ '"'
for prop_name in columns_list
]
),
" ",
)
insert_statement = dedent(
f"""
INSERT INTO {self.sql_config.schema_name}.{temp_table_name}
(
{columns_list_str}
)
SELECT
{columns_list_str}
FROM read_json_auto(
[{files_list}],
format = 'newline_delimited',
union_by_name = true,
columns = {{ { columns_type_map } }}
)
"""
)
self._execute_sql(insert_statement)
Comment on lines +84 to +137
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on _write_files_to_new_table method

The implementation looks solid! A couple of questions/suggestions:

  1. Have you considered using f-strings for the SQL query construction? It might make the code a bit more readable, especially for the complex parts like columns_type_map.

  2. The method seems to be doing quite a lot. Would it make sense to break it down into smaller, more focused methods? For example, we could have separate methods for creating the table, preparing the column definitions, and constructing the INSERT statement.

What are your thoughts on these suggestions?

return temp_table_name
Loading
Loading