Skip to content

Commit

Permalink
feat: Support fanning out parent record into multiple child contexts/…
Browse files Browse the repository at this point in the history
…syncs (#2144)

feat: Support fanning out parent record into multiple child syncs
  • Loading branch information
edgarrmondragon committed Jan 11, 2024
1 parent 0675322 commit 5f1a68b
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 10 deletions.
11 changes: 7 additions & 4 deletions docs/parent_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ from a parent record each time the child stream is invoked.

1. Set `parent_stream_type` in the child-stream's class to the class of the parent.
2. Implement one of the below methods to pass context from the parent to the child:
1. If using `get_records(context)` you can simply return a tuple instead of a `record`
1. If using [`get_records`](singer_sdk.Stream.get_child_context) you can simply return a tuple instead of a `record`
dictionary. A tuple return value will be interpreted by the SDK as
`(record: dict, child_context: dict)`.
1. Override `get_child_context(record, context: Dict) -> dict` to return a new
2. Override [`get_child_context`](singer_sdk.Stream.get_child_context) to return a new
child context object based on records and any existing context from the parent stream.
3. If you need to sync more than one child stream per parent record, you can override
[`generate_child_contexts`](singer_sdk.Stream.generate_child_contexts) to yield as many
contexts as you need.
3. If the parent stream's replication key won't get updated when child items are changed,
indicate this by adding `ignore_parent_replication_key = True` in the child stream
class declaration.
4. If the number of _parent_ items is very large (thousands or tens of thousands), you can
optionally set `state_partitioning_keys` on the child stream to specify a subset of context keys to use
optionally set [`state_partitioning_keys`](singer_sdk.Stream.state_partitioning_keys) on the child stream to specify a subset of context keys to use
in state bookmarks. (When not set, the number of bookmarks will be equal to the number
of parent items.) If you do not wish to store any state bookmarks for the child stream, set `state_partitioning_keys` to `[]`.
of parent items.) If you do not wish to store any state bookmarks for the child stream, set[`state_partitioning_keys`](singer_sdk.Stream.state_partitioning_keys) to `[]`.

## Example parent-child implementation

Expand Down
29 changes: 23 additions & 6 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,17 +1026,18 @@ def _process_record(
partition_context: The partition context.
"""
partition_context = partition_context or {}
child_context = copy.copy(
self.get_child_context(record=record, context=child_context),
)
for key, val in partition_context.items():
# Add state context to records if not already present
if key not in record:
record[key] = val

# Sync children, except when primary mapper filters out the record
if self.stream_maps[0].get_filter_result(record):
self._sync_children(child_context)
for context in self.generate_child_contexts(
record=record,
context=child_context,
):
# Sync children, except when primary mapper filters out the record
if self.stream_maps[0].get_filter_result(record):
self._sync_children(copy.copy(context))

def _sync_records( # noqa: C901
self,
Expand Down Expand Up @@ -1289,6 +1290,22 @@ def get_child_context(self, record: dict, context: dict | None) -> dict | None:

return context or record

def generate_child_contexts(
self,
record: dict,
context: dict | None,
) -> t.Iterable[dict | None]:
"""Generate child contexts.
Args:
record: Individual record in the stream.
context: Stream partition or context dictionary.
Yields:
A child context for each child stream.
"""
yield self.get_child_context(record=record, context=context)

# Abstract Methods

@abc.abstractmethod
Expand Down
97 changes: 97 additions & 0 deletions tests/core/test_parent_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,100 @@ def test_child_deselected_parent(tap_with_deselected_parent: MyTap):
assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages)
assert all(msg["stream"] == child_stream.name for msg in child_record_messages)
assert all("pid" in msg["record"] for msg in child_record_messages)


def test_one_parent_many_children(tap: MyTap):
"""Test tap output with parent stream deselected."""

class ParentMany(Stream):
"""A parent stream."""

name = "parent_many"
schema: t.ClassVar[dict] = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"children": {"type": "array", "items": {"type": "integer"}},
},
}

def get_records(
self,
context: dict | None, # noqa: ARG002
) -> t.Iterable[dict | tuple[dict, dict | None]]:
yield {"id": "1", "children": [1, 2, 3]}

def generate_child_contexts(
self,
record: dict,
context: dict | None, # noqa: ARG002
) -> t.Iterable[dict | None]:
for child_id in record["children"]:
yield {"child_id": child_id, "pid": record["id"]}

class ChildMany(Stream):
"""A child stream."""

name = "child_many"
schema: t.ClassVar[dict] = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"pid": {"type": "integer"},
},
}
parent_stream_type = ParentMany

def get_records(self, context: dict | None):
"""Get dummy records."""
yield {
"id": context["child_id"],
"composite_id": f"{context['pid']}-{context['child_id']}",
}

class MyTapMany(Tap):
"""A tap with streams having a parent-child relationship."""

name = "my-tap-many"

def discover_streams(self):
"""Discover streams."""
return [
ParentMany(self),
ChildMany(self),
]

tap = MyTapMany()
parent_stream = tap.streams["parent_many"]
child_stream = tap.streams["child_many"]

messages = _get_messages(tap)

# Parent schema is emitted
assert messages[1]
assert messages[1]["type"] == SingerMessageType.SCHEMA
assert messages[1]["stream"] == parent_stream.name
assert messages[1]["schema"] == parent_stream.schema

# Child schemas are emitted
schema_messages = messages[2:9:3]
assert schema_messages
assert all(msg["type"] == SingerMessageType.SCHEMA for msg in schema_messages)
assert all(msg["stream"] == child_stream.name for msg in schema_messages)
assert all(msg["schema"] == child_stream.schema for msg in schema_messages)

# Child records are emitted
child_record_messages = messages[3:10:3]
assert child_record_messages
assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages)
assert all(msg["stream"] == child_stream.name for msg in child_record_messages)
assert all("pid" in msg["record"] for msg in child_record_messages)

# State messages are emitted
state_messages = messages[4:11:3]
assert state_messages
assert all(msg["type"] == SingerMessageType.STATE for msg in state_messages)

# Parent record is emitted
assert messages[11]
assert messages[11]["type"] == SingerMessageType.RECORD

0 comments on commit 5f1a68b

Please sign in to comment.