From 74e6dc60b9f9224a38b3f0c09223e48a8a7c954a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Wed, 10 Jan 2024 17:52:07 -0600 Subject: [PATCH] feat: Support fanning out parent record into multiple child syncs --- docs/parent_streams.md | 11 ++-- singer_sdk/streams/core.py | 29 ++++++++-- tests/core/test_parent_child.py | 97 +++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 10 deletions(-) diff --git a/docs/parent_streams.md b/docs/parent_streams.md index 235dbfac8..fec6d8fb9 100644 --- a/docs/parent_streams.md +++ b/docs/parent_streams.md @@ -8,18 +8,21 @@ from a parent record each time the child stream is invoked. 1. Set `parent_stream_type` in the child-stream's class to the class of the parent. 2. Implement one of the below methods to pass context from the parent to the child: - 1. If using `get_records(context)` you can simply return a tuple instead of a `record` + 1. If using [`get_records`](singer_sdk.Stream.get_child_context) you can simply return a tuple instead of a `record` dictionary. A tuple return value will be interpreted by the SDK as `(record: dict, child_context: dict)`. - 1. Override `get_child_context(record, context: Dict) -> dict` to return a new + 2. Override [`get_child_context`](singer_sdk.Stream.get_child_context) to return a new child context object based on records and any existing context from the parent stream. + 3. If you need to sync more than one child stream per parent record, you can override + [`generate_child_contexts`](singer_sdk.Stream.generate_child_contexts) to yield as many + contexts as you need. 3. If the parent stream's replication key won't get updated when child items are changed, indicate this by adding `ignore_parent_replication_key = True` in the child stream class declaration. 4. If the number of _parent_ items is very large (thousands or tens of thousands), you can - optionally set `state_partitioning_keys` on the child stream to specify a subset of context keys to use + optionally set [`state_partitioning_keys`](singer_sdk.Stream.state_partitioning_keys) on the child stream to specify a subset of context keys to use in state bookmarks. (When not set, the number of bookmarks will be equal to the number - of parent items.) If you do not wish to store any state bookmarks for the child stream, set `state_partitioning_keys` to `[]`. + of parent items.) If you do not wish to store any state bookmarks for the child stream, set[`state_partitioning_keys`](singer_sdk.Stream.state_partitioning_keys) to `[]`. ## Example parent-child implementation diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 459b9e761..567d7284d 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -1026,17 +1026,18 @@ def _process_record( partition_context: The partition context. """ partition_context = partition_context or {} - child_context = copy.copy( - self.get_child_context(record=record, context=child_context), - ) for key, val in partition_context.items(): # Add state context to records if not already present if key not in record: record[key] = val - # Sync children, except when primary mapper filters out the record - if self.stream_maps[0].get_filter_result(record): - self._sync_children(child_context) + for context in self.generate_child_contexts( + record=record, + context=child_context, + ): + # Sync children, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + self._sync_children(copy.copy(context)) def _sync_records( # noqa: C901 self, @@ -1289,6 +1290,22 @@ def get_child_context(self, record: dict, context: dict | None) -> dict | None: return context or record + def generate_child_contexts( + self, + record: dict, + context: dict | None, + ) -> t.Iterable[dict | None]: + """Generate child contexts. + + Args: + record: Individual record in the stream. + context: Stream partition or context dictionary. + + Yields: + A child context for each child stream. + """ + yield self.get_child_context(record=record, context=context) + # Abstract Methods @abc.abstractmethod diff --git a/tests/core/test_parent_child.py b/tests/core/test_parent_child.py index 7fd01a153..8b9ad2a16 100644 --- a/tests/core/test_parent_child.py +++ b/tests/core/test_parent_child.py @@ -167,3 +167,100 @@ def test_child_deselected_parent(tap_with_deselected_parent: MyTap): assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages) assert all(msg["stream"] == child_stream.name for msg in child_record_messages) assert all("pid" in msg["record"] for msg in child_record_messages) + + +def test_one_parent_many_children(tap: MyTap): + """Test tap output with parent stream deselected.""" + + class ParentMany(Stream): + """A parent stream.""" + + name = "parent_many" + schema: t.ClassVar[dict] = { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "children": {"type": "array", "items": {"type": "integer"}}, + }, + } + + def get_records( + self, + context: dict | None, # noqa: ARG002 + ) -> t.Iterable[dict | tuple[dict, dict | None]]: + yield {"id": "1", "children": [1, 2, 3]} + + def generate_child_contexts( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> t.Iterable[dict | None]: + for child_id in record["children"]: + yield {"child_id": child_id, "pid": record["id"]} + + class ChildMany(Stream): + """A child stream.""" + + name = "child_many" + schema: t.ClassVar[dict] = { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "pid": {"type": "integer"}, + }, + } + parent_stream_type = ParentMany + + def get_records(self, context: dict | None): + """Get dummy records.""" + yield { + "id": context["child_id"], + "composite_id": f"{context['pid']}-{context['child_id']}", + } + + class MyTapMany(Tap): + """A tap with streams having a parent-child relationship.""" + + name = "my-tap-many" + + def discover_streams(self): + """Discover streams.""" + return [ + ParentMany(self), + ChildMany(self), + ] + + tap = MyTapMany() + parent_stream = tap.streams["parent_many"] + child_stream = tap.streams["child_many"] + + messages = _get_messages(tap) + + # Parent schema is emitted + assert messages[1] + assert messages[1]["type"] == SingerMessageType.SCHEMA + assert messages[1]["stream"] == parent_stream.name + assert messages[1]["schema"] == parent_stream.schema + + # Child schemas are emitted + schema_messages = messages[2:9:3] + assert schema_messages + assert all(msg["type"] == SingerMessageType.SCHEMA for msg in schema_messages) + assert all(msg["stream"] == child_stream.name for msg in schema_messages) + assert all(msg["schema"] == child_stream.schema for msg in schema_messages) + + # Child records are emitted + child_record_messages = messages[3:10:3] + assert child_record_messages + assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages) + assert all(msg["stream"] == child_stream.name for msg in child_record_messages) + assert all("pid" in msg["record"] for msg in child_record_messages) + + # State messages are emitted + state_messages = messages[4:11:3] + assert state_messages + assert all(msg["type"] == SingerMessageType.STATE for msg in state_messages) + + # Parent record is emitted + assert messages[11] + assert messages[11]["type"] == SingerMessageType.RECORD