-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: Iceberg cache #355
base: main
Are you sure you want to change the base?
Feat: Iceberg cache #355
Changes from all commits
73f9289
d26f29e
ca37196
07ac6a5
80e0fdb
8d9ef08
015faf0
4fb9b3b
bddbc59
9af597f
46ba395
e99e72f
4d163a0
6337bd2
852a4b8
049b38a
a671c97
b06485a
34acdda
41f14e1
ed8edec
a5efbd1
c3d4e9e
c6969de
63b18ef
8f82437
de810e8
9abcea4
80b7340
30d1afb
54c9752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
# perf-test | ||
*test-artifact* | ||
|
||
# temp files | ||
temp | ||
.temp | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
"""Custom Airbyte message classes. | ||
|
||
These classes override the default handling, in order to ensure that the data field is always a | ||
jsonified string, rather than a dict. | ||
|
||
To use these classes, import them from this module, and use them in place of the default classes. | ||
|
||
Example: | ||
```python | ||
from airbyte._airbyte_message_overrides import AirbyteMessageWithStrData | ||
|
||
for line in sys.stdin: | ||
message = AirbyteMessageWithStrData.model_validate_json(line) | ||
``` | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import copy | ||
import json | ||
from typing import Any | ||
|
||
from pydantic import BaseModel, Field, model_validator | ||
|
||
from airbyte_protocol.models import ( | ||
AirbyteMessage, | ||
AirbyteRecordMessage, | ||
AirbyteStateMessage, | ||
) | ||
|
||
|
||
AirbyteRecordMessageWithStrData = copy.deepcopy(AirbyteRecordMessage) | ||
AirbyteStateMessageWithStrData = copy.deepcopy(AirbyteStateMessage) | ||
AirbyteMessageWithStrData = copy.deepcopy(AirbyteMessage) | ||
|
||
# Modify the data field in the copied class | ||
AirbyteRecordMessageWithStrData.__annotations__["data"] = str | ||
AirbyteStateMessageWithStrData.__annotations__["data"] = str | ||
|
||
AirbyteRecordMessageWithStrData.data = Field(..., description="jsonified record data as a str") | ||
AirbyteStateMessageWithStrData.data = Field(..., description="jsonified state data as a str") | ||
|
||
|
||
# Add a validator to ensure data is a JSON string | ||
@model_validator(mode="before") | ||
def ensure_data_is_string( | ||
cls: BaseModel, # type: ignore # noqa: ARG001, PGH003 | ||
values: dict[str, Any], | ||
) -> None: | ||
if "data" in values and not isinstance(values["data"], dict): | ||
values["data"] = json.dumps(values["data"]) | ||
if "data" in values and not isinstance(values["data"], str): | ||
raise ValueError | ||
|
||
|
||
AirbyteRecordMessageWithStrData.ensure_data_is_string = classmethod(ensure_data_is_string) # type: ignore [arg-type] | ||
AirbyteStateMessageWithStrData.ensure_data_is_string = classmethod(ensure_data_is_string) # type: ignore [arg-type] | ||
|
||
AirbyteMessageWithStrData.__annotations__["record"] = AirbyteRecordMessageWithStrData | None | ||
AirbyteMessageWithStrData.__annotations__["state"] = AirbyteStateMessageWithStrData | None |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,13 @@ | |
from __future__ import annotations | ||
|
||
import datetime | ||
import io | ||
import sys | ||
from collections.abc import Iterator | ||
from typing import IO, TYPE_CHECKING, cast | ||
|
||
import pydantic | ||
from typing_extensions import final | ||
from typing_extensions import Literal, final | ||
|
||
from airbyte_protocol.models import ( | ||
AirbyteMessage, | ||
|
@@ -57,6 +58,30 @@ def read(self) -> str: | |
"""Read the next message from the iterator.""" | ||
return next(self).model_dump_json() | ||
|
||
def as_filelike(self) -> io.BytesIO: | ||
"""Return a file-like object that reads from the iterator.""" | ||
|
||
class FileLikeReader(io.RawIOBase): | ||
def __init__(self, iterator: Iterator[AirbyteMessage]) -> None: | ||
self.iterator = (msg.model_dump_json() for msg in iterator) | ||
self.buffer = "" | ||
|
||
def readable(self) -> Literal[True]: | ||
return True | ||
|
||
def readinto(self, b: Any) -> int: | ||
try: | ||
chunk = next(self.iterator) | ||
except StopIteration: | ||
return 0 # EOF | ||
|
||
data = chunk.encode() | ||
n = len(data) | ||
b[:n] = data | ||
return n | ||
|
||
return cast(io.BytesIO, FileLikeReader(self._iterator)) | ||
Comment on lines
+61
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting implementation of
What are your thoughts on these suggestions? Do you think they would improve the code? Here's a potential improvement for the - def readinto(self, b: Any) -> int:
+ def readinto(self, b: bytearray) -> int: And maybe add a docstring like this: class FileLikeReader(io.RawIOBase):
"""A file-like object that reads AirbyteMessages from an iterator.""" wdyt? ToolsRuff
|
||
|
||
@classmethod | ||
def from_read_result(cls, read_result: ReadResult) -> AirbyteMessageIterator: | ||
"""Create a iterator from a `ReadResult` object.""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
from __future__ import annotations | ||
|
||
from textwrap import dedent, indent | ||
from typing import TYPE_CHECKING | ||
|
||
from airbyte_protocol.models import ( | ||
AirbyteRecordMessage, | ||
AirbyteStateMessage, | ||
AirbyteStateType, | ||
) | ||
|
||
from airbyte import exceptions as exc | ||
from airbyte._future_cdk.sql_processor import SqlConfig, SqlProcessorBase | ||
from airbyte._future_cdk.state_writers import StateWriterBase | ||
from airbyte._processors.file.parquet import ParquetWriter | ||
|
||
|
||
if TYPE_CHECKING: | ||
from pathlib import Path | ||
|
||
|
||
class IcebergConfig(SqlConfig): | ||
"""A Iceberg configuration.""" | ||
|
||
def __init__(self, db_path: str, schema_name: str) -> None: | ||
"""Initialize the Iceberg configuration.""" | ||
self.db_path = db_path | ||
self.schema_name = schema_name | ||
|
||
|
||
class IcebergSqlProcessor(SqlProcessorBase): | ||
"""A Iceberg SQL processor.""" | ||
|
||
supports_merge_insert = False | ||
file_writer_class = ParquetWriter | ||
sql_config: IcebergConfig | ||
|
||
class IcebergStateWriter(StateWriterBase): | ||
"""A state writer for the Parquet cache.""" | ||
|
||
def __init__(self, iceberg_processor: IcebergSqlProcessor) -> None: | ||
self._iceberg_processor = iceberg_processor | ||
super().__init__() | ||
|
||
def _write_state(self, state: AirbyteRecordMessage) -> None: | ||
"""Write the state to the cache.""" | ||
self._iceberg_processor.write_state(state) | ||
Comment on lines
+39
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion for IcebergStateWriter and private method access The IcebergStateWriter class looks good overall! However, I noticed that in the To improve encapsulation, what do you think about adding a public method to the file writer class that internally calls For example: # In the file writer class
def write_state(self, state):
self._write_state_to_file(state)
# In IcebergSqlProcessor.write_state
self.file_writer.write_state(state) Does this approach make sense to you? Also applies to: 81-81 |
||
|
||
@property | ||
def get_state_writer(self) -> StateWriterBase: | ||
if self._state_writer is None: | ||
self._state_writer = self.IcebergStateWriter(self) | ||
|
||
return self._state_writer | ||
|
||
def write_state(self, state: AirbyteStateMessage) -> None: | ||
"""Write the state to the cache. | ||
|
||
Args: | ||
state (AirbyteStateMessage): The state to write. | ||
|
||
Implementation: | ||
- State messages are written a separate file. | ||
- Any pending records are written to the cache file and the cache file is closed. | ||
- For stream state messages, the matching stream batches are flushed and closed. | ||
- For global state, all batches are flushed and closed. | ||
""" | ||
stream_names: list[str] = [] | ||
if state.type == AirbyteStateType.STREAM: | ||
stream_names = [state.record.stream] | ||
if state.type == AirbyteStateType.GLOBAL: | ||
stream_names = list(self._buffered_records.keys()) | ||
else: | ||
msg = f"Unexpected state type: {state.type}" | ||
raise exc.PyAirbyteInternalError(msg) | ||
|
||
for stream_name in stream_names: | ||
state_file_name = self.file_writer.get_active_batch(stream_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused variable It looks like the ToolsRuff
|
||
self.file_writer.flush_active_batch(stream_name) | ||
self.file_writer._write_state_to_file(state) | ||
return | ||
Comment on lines
+57
to
+82
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion for The implementation looks good, but I have a couple of suggestions:
What do you think about these changes? They might make the logic a bit clearer. Also, the ToolsRuff
|
||
|
||
def _write_files_to_new_table( | ||
self, | ||
files: list[Path], | ||
stream_name: str, | ||
batch_id: str, | ||
) -> str: | ||
"""Write file(s) to a new table. | ||
|
||
This involves registering the table in the Iceberg catalog, creating a manifest file, | ||
and registering the manifest file in the catalog. | ||
""" | ||
temp_table_name = self._create_table_for_loading( | ||
stream_name=stream_name, | ||
batch_id=batch_id, | ||
) | ||
columns_list = list(self._get_sql_column_definitions(stream_name=stream_name).keys()) | ||
columns_list_str = indent( | ||
"\n, ".join([self._quote_identifier(col) for col in columns_list]), | ||
" ", | ||
) | ||
files_list = ", ".join([f"'{f!s}'" for f in files]) | ||
columns_type_map = indent( | ||
"\n, ".join( | ||
[ | ||
self._quote_identifier(self.normalizer.normalize(prop_name)) | ||
+ ': "' | ||
+ str( | ||
self._get_sql_column_definitions(stream_name)[ | ||
self.normalizer.normalize(prop_name) | ||
] | ||
) | ||
+ '"' | ||
for prop_name in columns_list | ||
] | ||
), | ||
" ", | ||
) | ||
insert_statement = dedent( | ||
f""" | ||
INSERT INTO {self.sql_config.schema_name}.{temp_table_name} | ||
( | ||
{columns_list_str} | ||
) | ||
SELECT | ||
{columns_list_str} | ||
FROM read_json_auto( | ||
[{files_list}], | ||
format = 'newline_delimited', | ||
union_by_name = true, | ||
columns = {{ { columns_type_map } }} | ||
) | ||
""" | ||
) | ||
self._execute_sql(insert_statement) | ||
Comment on lines
+84
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on The implementation looks solid! A couple of questions/suggestions:
What are your thoughts on these suggestions? |
||
return temp_table_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using inheritance for custom classes
The current implementation using deep copies works well, but have you considered using inheritance instead? It might lead to a more maintainable and cleaner code structure. Something like:
This approach could potentially simplify the code and make it easier to manage in the future. What do you think? Does this align with your design goals?