Skip to content

Commit

Permalink
feat(targets): Targets now accept a batch_size_rows setting to conf…
Browse files Browse the repository at this point in the history
…igure how many rows are loaded in each record batch (#2248)

* added batch_size_rows tests to target

* add batch_size_rows test to test target sql

* added batch size row to target capabilities

* add batch_size_rows to target as a bulitin config

* added code for batch_size_rows

* Apply suggestions from code review

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>

* applied documentaion update from review to max_size

* Update singer_sdk/sinks/core.py

* Update singer_sdk/sinks/core.py

* chore: Link to `batch_size_rows` attribute docs

* Update `batch_size_rows` docs

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
Co-authored-by: Edgar Ramírez-Mondragón <[email protected]>
  • Loading branch information
3 people authored Feb 23, 2024
1 parent d8be4ab commit 3e2c3d3
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 12 deletions.
7 changes: 7 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@
default=True,
),
).to_dict()
TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList(
Property(
"batch_size_rows",
IntegerType,
description="Maximum number of rows in each batch.",
),
).to_dict()


class TargetLoadMethods(str, Enum):
Expand Down
43 changes: 33 additions & 10 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

# Batch full markers
self._batch_size_rows: int | None = target.config.get(
"batch_size_rows",
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()

@cached_property
Expand Down Expand Up @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002

# Size properties

@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
"""
return self.MAX_SIZE_DEFAULT

@property
def current_size(self) -> int:
"""Get current batch size.
Expand All @@ -269,13 +265,40 @@ def current_size(self) -> int:

@property
def is_full(self) -> bool:
"""Check against size limit.
"""Check against the batch size limit.
Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size

@property
def batch_size_rows(self) -> int | None:
"""The maximum number of rows a batch can accumulate before being processed.
Returns:
The max number of rows or None if not set.
"""
return self._batch_size_rows

@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
.. versionchanged:: 0.36.0
This property now takes into account the
:attr:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding
``batch_size_rows`` target setting.
"""
return (
self.batch_size_rows
if self.batch_size_rows is not None
else self.MAX_SIZE_DEFAULT
)

# Tally methods

@t.final
Expand Down
5 changes: 4 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_BATCH_SIZE_ROWS_CONFIG,
TARGET_HARD_DELETE_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
Expand Down Expand Up @@ -363,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None:

if sink.is_full:
self.logger.info(
"Target sink for '%s' is full. Draining...",
"Target sink for '%s' is full. Current size is '%s'. Draining...",
sink.stream_name,
sink.current_size,
)
self.drain_one(sink)

Expand Down Expand Up @@ -610,6 +612,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:

_merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema)
_merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema)
_merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema)

capabilities = cls.capabilities

Expand Down
6 changes: 5 additions & 1 deletion tests/core/targets/test_target_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ class MyTarget(SQLTargetMock, capabilities=capabilities):
pass

about = MyTarget._get_about_info()
default_settings = {"add_record_metadata", "load_method"}
default_settings = {
"add_record_metadata",
"load_method",
"batch_size_rows",
}
assert set(about.settings["properties"]) == expected_settings | default_settings
38 changes: 38 additions & 0 deletions tests/core/test_target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test_target_about_info():
assert "flattening_max_depth" in about.settings["properties"]
assert "batch_config" in about.settings["properties"]
assert "add_record_metadata" in about.settings["properties"]
assert "batch_size_rows" in about.settings["properties"]


def test_sql_get_sink():
Expand Down Expand Up @@ -142,3 +143,40 @@ def test_add_sqlsink_and_get_sink():
target.get_sink(
"bar",
)


def test_batch_size_rows_and_max_size():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
key_properties = []
target_default = TargetMock()
sink_default = BatchSinkMock(
target=target_default,
stream_name="foo",
schema=input_schema_1,
key_properties=key_properties,
)
target_set = TargetMock(config={"batch_size_rows": 100000})
sink_set = BatchSinkMock(
target=target_set,
stream_name="bar",
schema=input_schema_1,
key_properties=key_properties,
)
assert sink_default.stream_name == "foo"
assert sink_default._batch_size_rows is None
assert sink_default.batch_size_rows is None
assert sink_default.max_size == 10000
assert sink_set.stream_name == "bar"
assert sink_set._batch_size_rows == 100000
assert sink_set.batch_size_rows == 100000
assert sink_set.max_size == 100000

0 comments on commit 3e2c3d3

Please sign in to comment.