Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(targets): Targets now accept a batch_size_rows setting to configure how many rows are loaded in each record batch #2248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
acaf26f
added batch_size_rows tests to target
BuzzCutNorman Feb 15, 2024
49759b4
add batch_size_rows test to test target sql
BuzzCutNorman Feb 15, 2024
9dbed19
added batch size row to target capabilities
BuzzCutNorman Feb 15, 2024
f8caa5a
add batch_size_rows to target as a bulitin config
BuzzCutNorman Feb 15, 2024
93dcdb1
added code for batch_size_rows
BuzzCutNorman Feb 15, 2024
1267a7c
Merge branch 'main' into 2155-target-set-batch-size-rows
BuzzCutNorman Feb 15, 2024
016a7bd
Merge branch 'main' into 2155-target-set-batch-size-rows
BuzzCutNorman Feb 19, 2024
c36c9f4
Merge branch 'main' into 2155-target-set-batch-size-rows
edgarrmondragon Feb 19, 2024
51f2e57
Apply suggestions from code review
BuzzCutNorman Feb 19, 2024
84793a9
Merge branch 'main' into 2155-target-set-batch-size-rows
edgarrmondragon Feb 19, 2024
89f6846
applied documentaion update from review to max_size
BuzzCutNorman Feb 19, 2024
8b278b4
Update singer_sdk/sinks/core.py
edgarrmondragon Feb 19, 2024
1a06d0c
Merge branch 'main' into 2155-target-set-batch-size-rows
edgarrmondragon Feb 19, 2024
678e17f
Update singer_sdk/sinks/core.py
edgarrmondragon Feb 19, 2024
c247f82
chore: Link to `batch_size_rows` attribute docs
edgarrmondragon Feb 19, 2024
e0b4425
Update `batch_size_rows` docs
edgarrmondragon Feb 19, 2024
32bbe6e
Merge branch 'main' into 2155-target-set-batch-size-rows
edgarrmondragon Feb 20, 2024
787e260
Merge branch 'main' into 2155-target-set-batch-size-rows
edgarrmondragon Feb 20, 2024
ea048a5
Merge branch 'main' into 2155-target-set-batch-size-rows
edgarrmondragon Feb 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@
default=True,
),
).to_dict()
TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList(
Property(
"batch_size_rows",
IntegerType,
description="Maximum number of rows in each batch.",
),
).to_dict()


class TargetLoadMethods(str, Enum):
Expand Down
43 changes: 33 additions & 10 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

# Batch full markers
self._batch_size_rows: int | None = target.config.get(
"batch_size_rows",
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()

@cached_property
Expand Down Expand Up @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002

# Size properties

@property
def max_size(self) -> int:
"""Get max batch size.

Returns:
Max number of records to batch before `is_full=True`
"""
return self.MAX_SIZE_DEFAULT

@property
def current_size(self) -> int:
"""Get current batch size.
Expand All @@ -269,13 +265,40 @@ def current_size(self) -> int:

@property
def is_full(self) -> bool:
"""Check against size limit.
"""Check against the batch size limit.

Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size

@property
def batch_size_rows(self) -> int | None:
"""The maximum number of rows a batch can accumulate before being processed.

Returns:
The max number of rows or None if not set.
"""
return self._batch_size_rows

@property
def max_size(self) -> int:
"""Get max batch size.

Returns:
Max number of records to batch before `is_full=True`
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

.. versionchanged:: 0.36.0
This property now takes into account the
:attr:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding
``batch_size_rows`` target setting.
"""
return (
self.batch_size_rows
if self.batch_size_rows is not None
else self.MAX_SIZE_DEFAULT
)

# Tally methods

@t.final
Expand Down
5 changes: 4 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_BATCH_SIZE_ROWS_CONFIG,
TARGET_HARD_DELETE_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
Expand Down Expand Up @@ -363,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None:

if sink.is_full:
self.logger.info(
"Target sink for '%s' is full. Draining...",
"Target sink for '%s' is full. Current size is '%s'. Draining...",
sink.stream_name,
sink.current_size,
)
self.drain_one(sink)

Expand Down Expand Up @@ -610,6 +612,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:

_merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema)
_merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema)
_merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema)

capabilities = cls.capabilities

Expand Down
6 changes: 5 additions & 1 deletion tests/core/targets/test_target_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ class MyTarget(SQLTargetMock, capabilities=capabilities):
pass

about = MyTarget._get_about_info()
default_settings = {"add_record_metadata", "load_method"}
default_settings = {
"add_record_metadata",
"load_method",
"batch_size_rows",
}
assert set(about.settings["properties"]) == expected_settings | default_settings
38 changes: 38 additions & 0 deletions tests/core/test_target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test_target_about_info():
assert "flattening_max_depth" in about.settings["properties"]
assert "batch_config" in about.settings["properties"]
assert "add_record_metadata" in about.settings["properties"]
assert "batch_size_rows" in about.settings["properties"]


def test_sql_get_sink():
Expand Down Expand Up @@ -142,3 +143,40 @@ def test_add_sqlsink_and_get_sink():
target.get_sink(
"bar",
)


def test_batch_size_rows_and_max_size():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
key_properties = []
target_default = TargetMock()
sink_default = BatchSinkMock(
target=target_default,
stream_name="foo",
schema=input_schema_1,
key_properties=key_properties,
)
target_set = TargetMock(config={"batch_size_rows": 100000})
sink_set = BatchSinkMock(
target=target_set,
stream_name="bar",
schema=input_schema_1,
key_properties=key_properties,
)
assert sink_default.stream_name == "foo"
assert sink_default._batch_size_rows is None
assert sink_default.batch_size_rows is None
assert sink_default.max_size == 10000
assert sink_set.stream_name == "bar"
assert sink_set._batch_size_rows == 100000
assert sink_set.batch_size_rows == 100000
assert sink_set.max_size == 100000