Skip to content

Commit

Permalink
Feat/1922 rest api source add mulitple path parameters (#1923)
Browse files Browse the repository at this point in the history
Co-authored-by: Antony Rinaldi <[email protected]>
  • Loading branch information
TheOneTrueAnt and AntonyR-BG authored Oct 5, 2024
1 parent 786a08e commit ca9a869
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 51 deletions.
17 changes: 9 additions & 8 deletions dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def create_resources(
client_config: ClientConfig,
dependency_graph: graphlib.TopologicalSorter,
endpoint_resource_map: Dict[str, Union[EndpointResource, DltResource]],
resolved_param_map: Dict[str, Optional[ResolvedParam]],
resolved_param_map: Dict[str, Optional[List[ResolvedParam]]],
) -> Dict[str, DltResource]:
resources = {}

Expand All @@ -229,10 +229,10 @@ def create_resources(
paginator = create_paginator(endpoint_config.get("paginator"))
processing_steps = endpoint_resource.pop("processing_steps", [])

resolved_param: ResolvedParam = resolved_param_map[resource_name]
resolved_params: List[ResolvedParam] = resolved_param_map[resource_name]

include_from_parent: List[str] = endpoint_resource.get("include_from_parent", [])
if not resolved_param and include_from_parent:
if not resolved_params and include_from_parent:
raise ValueError(
f"Resource {resource_name} has include_from_parent but is not "
"dependent on another resource"
Expand Down Expand Up @@ -267,7 +267,7 @@ def process(
resource.add_map(step["map"])
return resource

if resolved_param is None:
if resolved_params is None:

def paginate_resource(
method: HTTPMethodBasic,
Expand Down Expand Up @@ -318,9 +318,10 @@ def paginate_resource(
resources[resource_name] = process(resources[resource_name], processing_steps)

else:
predecessor = resources[resolved_param.resolve_config["resource"]]
first_param = resolved_params[0]
predecessor = resources[first_param.resolve_config["resource"]]

base_params = exclude_keys(request_params, {resolved_param.param_name})
base_params = exclude_keys(request_params, {x.param_name for x in resolved_params})

def paginate_dependent_resource(
items: List[Dict[str, Any]],
Expand All @@ -331,7 +332,7 @@ def paginate_dependent_resource(
data_selector: Optional[jsonpath.TJsonPath],
hooks: Optional[Dict[str, Any]],
client: RESTClient = client,
resolved_param: ResolvedParam = resolved_param,
resolved_params: List[ResolvedParam] = resolved_params,
include_from_parent: List[str] = include_from_parent,
incremental_object: Optional[Incremental[Any]] = incremental_object,
incremental_param: Optional[IncrementalParam] = incremental_param,
Expand All @@ -349,7 +350,7 @@ def paginate_dependent_resource(

for item in items:
formatted_path, parent_record = process_parent_data_item(
path, item, resolved_param, include_from_parent
path, item, resolved_params, include_from_parent
)

for child_page in client.paginate(
Expand Down
55 changes: 33 additions & 22 deletions dlt/sources/rest_api/config_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ def build_resource_dependency_graph(
resource_defaults: EndpointResourceBase,
resource_list: List[Union[str, EndpointResource, DltResource]],
) -> Tuple[
Any, Dict[str, Union[EndpointResource, DltResource]], Dict[str, Optional[ResolvedParam]]
Any, Dict[str, Union[EndpointResource, DltResource]], Dict[str, Optional[List[ResolvedParam]]]
]:
dependency_graph = graphlib.TopologicalSorter()
resolved_param_map: Dict[str, ResolvedParam] = {}
resolved_param_map: Dict[str, Optional[List[ResolvedParam]]] = {}
endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults)

# create dependency graph
Expand All @@ -288,20 +288,24 @@ def build_resource_dependency_graph(
assert isinstance(endpoint_resource["endpoint"], dict)
# connect transformers to resources via resolved params
resolved_params = _find_resolved_params(endpoint_resource["endpoint"])
if len(resolved_params) > 1:
raise ValueError(
f"Multiple resolved params for resource {resource_name}: {resolved_params}"
)
elif len(resolved_params) == 1:
resolved_param = resolved_params[0]
predecessor = resolved_param.resolve_config["resource"]

# set of resources in resolved params
named_resources = {rp.resolve_config["resource"] for rp in resolved_params}

if len(named_resources) > 1:
raise ValueError(f"Multiple parent resources for {resource_name}: {resolved_params}")
elif len(named_resources) == 1:
# validate the first parameter (note the resource is the same for all params)
first_param = resolved_params[0]
predecessor = first_param.resolve_config["resource"]
if predecessor not in endpoint_resource_map:
raise ValueError(
f"A transformer resource {resource_name} refers to non existing parent resource"
f" {predecessor} on {resolved_param}"
f" {predecessor} on {first_param}"
)

dependency_graph.add(resource_name, predecessor)
resolved_param_map[resource_name] = resolved_param
resolved_param_map[resource_name] = resolved_params
else:
dependency_graph.add(resource_name)
resolved_param_map[resource_name] = None
Expand Down Expand Up @@ -574,21 +578,28 @@ def remove_field(response: Response, *args, **kwargs) -> Response:
def process_parent_data_item(
path: str,
item: Dict[str, Any],
resolved_param: ResolvedParam,
resolved_params: List[ResolvedParam],
include_from_parent: List[str],
) -> Tuple[str, Dict[str, Any]]:
parent_resource_name = resolved_param.resolve_config["resource"]
parent_resource_name = resolved_params[0].resolve_config["resource"]

field_values = jsonpath.find_values(resolved_param.field_path, item)
param_values = {}

if not field_values:
field_path = resolved_param.resolve_config["field"]
raise ValueError(
f"Transformer expects a field '{field_path}' to be present in the incoming data from"
f" resource {parent_resource_name} in order to bind it to path param"
f" {resolved_param.param_name}. Available parent fields are {', '.join(item.keys())}"
)
bound_path = path.format(**{resolved_param.param_name: field_values[0]})
for resolved_param in resolved_params:
field_values = jsonpath.find_values(resolved_param.field_path, item)

if not field_values:
field_path = resolved_param.resolve_config["field"]
raise ValueError(
f"Transformer expects a field '{field_path}' to be present in the incoming data"
f" from resource {parent_resource_name} in order to bind it to path param"
f" {resolved_param.param_name}. Available parent fields are"
f" {', '.join(item.keys())}"
)

param_values[resolved_param.param_name] = field_values[0]

bound_path = path.format(**param_values)

parent_record: Dict[str, Any] = {}
if include_from_parent:
Expand Down
63 changes: 42 additions & 21 deletions tests/sources/rest_api/configurations/test_resolve_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,34 @@ def test_bind_path_param() -> None:


def test_process_parent_data_item() -> None:
resolve_param = ResolvedParam(
"id", {"field": "obj_id", "resource": "issues", "type": "resolve"}
)
resolve_params = [
ResolvedParam("id", {"field": "obj_id", "resource": "issues", "type": "resolve"})
]
bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_param, None
"dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_params, None
)
assert bound_path == "dlt-hub/dlt/issues/12345/comments"
assert parent_record == {}

bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_param, ["obj_id"]
"dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_params, ["obj_id"]
)
assert parent_record == {"_issues_obj_id": 12345}

bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{id}/comments",
{"obj_id": 12345, "obj_node": "node_1"},
resolve_param,
resolve_params,
["obj_id", "obj_node"],
)
assert parent_record == {"_issues_obj_id": 12345, "_issues_obj_node": "node_1"}

# test nested data
resolve_param_nested = ResolvedParam(
"id", {"field": "some_results.obj_id", "resource": "issues", "type": "resolve"}
)
resolve_param_nested = [
ResolvedParam(
"id", {"field": "some_results.obj_id", "resource": "issues", "type": "resolve"}
)
]
item = {"some_results": {"obj_id": 12345}}
bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{id}/comments", item, resolve_param_nested, None
Expand All @@ -123,7 +125,7 @@ def test_process_parent_data_item() -> None:
# param path not found
with pytest.raises(ValueError) as val_ex:
bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{id}/comments", {"_id": 12345}, resolve_param, None
"dlt-hub/dlt/issues/{id}/comments", {"_id": 12345}, resolve_params, None
)
assert "Transformer expects a field 'obj_id'" in str(val_ex.value)

Expand All @@ -132,11 +134,36 @@ def test_process_parent_data_item() -> None:
bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{id}/comments",
{"obj_id": 12345, "obj_node": "node_1"},
resolve_param,
resolve_params,
["obj_id", "node"],
)
assert "in order to include it in child records under _issues_node" in str(val_ex.value)

# Resolve multiple parameters from a single record
multi_resolve_params = [
ResolvedParam("issue_id", {"field": "issue", "resource": "comments", "type": "resolve"}),
ResolvedParam("id", {"field": "id", "resource": "comments", "type": "resolve"}),
]

bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{issue_id}/comments/{id}",
{"issue": 12345, "id": 56789},
multi_resolve_params,
None,
)
assert bound_path == "dlt-hub/dlt/issues/12345/comments/56789"
assert parent_record == {}

# param path not found with multiple parameters
with pytest.raises(ValueError) as val_ex:
bound_path, parent_record = process_parent_data_item(
"dlt-hub/dlt/issues/{issue_id}/comments/{id}",
{"_issue": 12345, "id": 56789},
multi_resolve_params,
None,
)
assert "Transformer expects a field 'issue'" in str(val_ex.value)


def test_two_resources_can_depend_on_one_parent_resource() -> None:
user_id = {
Expand Down Expand Up @@ -173,7 +200,7 @@ def test_two_resources_can_depend_on_one_parent_resource() -> None:
assert resources["user_details"]._pipe.parent.name == "users"


def test_dependent_resource_cannot_bind_multiple_parameters() -> None:
def test_dependent_resource_can_bind_multiple_parameters() -> None:
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
Expand All @@ -200,15 +227,9 @@ def test_dependent_resource_cannot_bind_multiple_parameters() -> None:
},
],
}
with pytest.raises(ValueError) as e:
rest_api_resources(config)

error_part_1 = re.escape(
"Multiple resolved params for resource user_details: [ResolvedParam(param_name='user_id'"
)
error_part_2 = re.escape("ResolvedParam(param_name='group_id'")
assert e.match(error_part_1)
assert e.match(error_part_2)
resources = rest_api_source(config).resources
assert resources["user_details"]._pipe.parent.name == "users"


def test_one_resource_cannot_bind_two_parents() -> None:
Expand Down Expand Up @@ -244,7 +265,7 @@ def test_one_resource_cannot_bind_two_parents() -> None:
rest_api_resources(config)

error_part_1 = re.escape(
"Multiple resolved params for resource user_details: [ResolvedParam(param_name='user_id'"
"Multiple parent resources for user_details: [ResolvedParam(param_name='user_id'"
)
error_part_2 = re.escape("ResolvedParam(param_name='group_id'")
assert e.match(error_part_1)
Expand Down

0 comments on commit ca9a869

Please sign in to comment.