Skip to content

Commit

Permalink
feat: Support stream aliasing of BATCH messages via stream maps (#2667
Browse files Browse the repository at this point in the history
)

* Support stream aliasing of `BATCH` messages via stream maps

* Ensure support at target

* Satisfy pre-commit
  • Loading branch information
ReubenFrankel authored Sep 13, 2024
1 parent 170f5bc commit d4a27a3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 14 deletions.
31 changes: 24 additions & 7 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,27 @@ def _generate_record_messages(
time_extracted=utc_now(),
)

def _generate_batch_messages(
self,
encoding: BaseBatchFileEncoding,
manifest: list[str],
) -> t.Generator[SDKBatchMessage, None, None]:
"""Write out a BATCH message.
Args:
encoding: The encoding to use for the batch.
manifest: A list of filenames for the batch.
Yields:
Batch message objects.
"""
for stream_map in self.stream_maps:
yield SDKBatchMessage(
stream=stream_map.stream_alias,
encoding=encoding,
manifest=manifest,
)

def _write_record_message(self, record: types.Record) -> None:
"""Write out a RECORD message.
Expand All @@ -902,13 +923,9 @@ def _write_batch_message(
encoding: The encoding to use for the batch.
manifest: A list of filenames for the batch.
"""
self._tap.write_message(
SDKBatchMessage(
stream=self.name,
encoding=encoding,
manifest=manifest,
),
)
for batch_message in self._generate_batch_messages(encoding, manifest):
self._tap.write_message(batch_message)

self._is_state_flushed = False

def _log_metric(self, point: metrics.Point) -> None:
Expand Down
16 changes: 9 additions & 7 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,16 @@ def _process_batch_message(self, message_dict: dict) -> None:
Args:
message_dict: TODO
"""
sink = self.get_sink(message_dict["stream"])

stream_name = message_dict["stream"]
encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"])
sink.process_batch_files(
encoding,
message_dict["manifest"],
)
self._handle_max_record_age()

for stream_map in self.mapper.stream_maps[stream_name]:
sink = self.get_sink(stream_map.stream_alias)
sink.process_batch_files(
encoding,
message_dict["manifest"],
)
self._handle_max_record_age()

# Sink drain methods

Expand Down
16 changes: 16 additions & 0 deletions tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ def get_records(self, context): # noqa: ARG002
},
}

def get_batches(self, batch_config, context): # noqa: ARG002
yield batch_config.encoding, ["file:///tmp/stream.json.gz"]


class MappedTap(Tap):
"""A tap with mapped streams."""
Expand Down Expand Up @@ -752,6 +755,19 @@ def discover_streams(self):
"aliased_stream.jsonl",
id="aliased_stream",
),
pytest.param(
{"mystream": {"__alias__": "aliased_stream"}},
{
"flattening_enabled": False,
"flattening_max_depth": 0,
"batch_config": {
"encoding": {"format": "jsonl", "compression": "gzip"},
"storage": {"root": "file:///tmp"},
},
},
"aliased_stream_batch.jsonl",
id="aliased_stream_batch",
),
pytest.param(
{},
{"flattening_enabled": True, "flattening_max_depth": 0},
Expand Down
5 changes: 5 additions & 0 deletions tests/snapshots/mapped_stream/aliased_stream_batch.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type":"STATE","value":{}}
{"type":"SCHEMA","stream":"aliased_stream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]}
{"type":"BATCH","stream":"aliased_stream","encoding":{"format":"jsonl","compression":"gzip"},"manifest":["file:///tmp/stream.json.gz"]}
{"type":"STATE","value":{}}
{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}

0 comments on commit d4a27a3

Please sign in to comment.