diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index b48d290a4..f76400c5a 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -152,6 +152,13 @@ default=True, ), ).to_dict() +TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList( + Property( + "batch_size_rows", + IntegerType, + description="Maximum number of rows in each batch.", + ), +).to_dict() class TargetLoadMethods(str, Enum): diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e35353789..f41d3d97f 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -182,6 +182,11 @@ def __init__( self._batch_records_read: int = 0 self._batch_dupe_records_merged: int = 0 + # Batch full markers + self._batch_size_rows: int | None = target.config.get( + "batch_size_rows", + ) + self._validator: BaseJSONSchemaValidator | None = self.get_validator() @cached_property @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002 # Size properties - @property - def max_size(self) -> int: - """Get max batch size. - - Returns: - Max number of records to batch before `is_full=True` - """ - return self.MAX_SIZE_DEFAULT - @property def current_size(self) -> int: """Get current batch size. @@ -269,13 +265,40 @@ def current_size(self) -> int: @property def is_full(self) -> bool: - """Check against size limit. + """Check against the batch size limit. Returns: True if the sink needs to be drained. """ return self.current_size >= self.max_size + @property + def batch_size_rows(self) -> int | None: + """The maximum number of rows a batch can accumulate before being processed. + + Returns: + The max number of rows or None if not set. + """ + return self._batch_size_rows + + @property + def max_size(self) -> int: + """Get max batch size. + + Returns: + Max number of records to batch before `is_full=True` + + .. versionchanged:: 0.36.0 + This property now takes into account the + :attr:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding + ``batch_size_rows`` target setting. + """ + return ( + self.batch_size_rows + if self.batch_size_rows is not None + else self.MAX_SIZE_DEFAULT + ) + # Tally methods @t.final diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 5a78bf362..3c0d37234 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -18,6 +18,7 @@ from singer_sdk.helpers.capabilities import ( ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, + TARGET_BATCH_SIZE_ROWS_CONFIG, TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, @@ -363,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None: if sink.is_full: self.logger.info( - "Target sink for '%s' is full. Draining...", + "Target sink for '%s' is full. Current size is '%s'. Draining...", sink.stream_name, + sink.current_size, ) self.drain_one(sink) @@ -610,6 +612,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: _merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema) _merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema) + _merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema) capabilities = cls.capabilities diff --git a/tests/core/targets/test_target_sql.py b/tests/core/targets/test_target_sql.py index fd71c0aeb..e47845a70 100644 --- a/tests/core/targets/test_target_sql.py +++ b/tests/core/targets/test_target_sql.py @@ -46,5 +46,9 @@ class MyTarget(SQLTargetMock, capabilities=capabilities): pass about = MyTarget._get_about_info() - default_settings = {"add_record_metadata", "load_method"} + default_settings = { + "add_record_metadata", + "load_method", + "batch_size_rows", + } assert set(about.settings["properties"]) == expected_settings | default_settings diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index eaff6d6a1..f9bcdb871 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -77,6 +77,7 @@ def test_target_about_info(): assert "flattening_max_depth" in about.settings["properties"] assert "batch_config" in about.settings["properties"] assert "add_record_metadata" in about.settings["properties"] + assert "batch_size_rows" in about.settings["properties"] def test_sql_get_sink(): @@ -142,3 +143,40 @@ def test_add_sqlsink_and_get_sink(): target.get_sink( "bar", ) + + +def test_batch_size_rows_and_max_size(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target_default = TargetMock() + sink_default = BatchSinkMock( + target=target_default, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + target_set = TargetMock(config={"batch_size_rows": 100000}) + sink_set = BatchSinkMock( + target=target_set, + stream_name="bar", + schema=input_schema_1, + key_properties=key_properties, + ) + assert sink_default.stream_name == "foo" + assert sink_default._batch_size_rows is None + assert sink_default.batch_size_rows is None + assert sink_default.max_size == 10000 + assert sink_set.stream_name == "bar" + assert sink_set._batch_size_rows == 100000 + assert sink_set.batch_size_rows == 100000 + assert sink_set.max_size == 100000