diff --git a/dlt/sources/rest_api/__init__.py b/dlt/sources/rest_api/__init__.py index b92ed6301c..4845433850 100644 --- a/dlt/sources/rest_api/__init__.py +++ b/dlt/sources/rest_api/__init__.py @@ -212,7 +212,7 @@ def create_resources( client_config: ClientConfig, dependency_graph: graphlib.TopologicalSorter, endpoint_resource_map: Dict[str, Union[EndpointResource, DltResource]], - resolved_param_map: Dict[str, Optional[ResolvedParam]], + resolved_param_map: Dict[str, Optional[List[ResolvedParam]]], ) -> Dict[str, DltResource]: resources = {} @@ -229,10 +229,10 @@ def create_resources( paginator = create_paginator(endpoint_config.get("paginator")) processing_steps = endpoint_resource.pop("processing_steps", []) - resolved_param: ResolvedParam = resolved_param_map[resource_name] + resolved_params: List[ResolvedParam] = resolved_param_map[resource_name] include_from_parent: List[str] = endpoint_resource.get("include_from_parent", []) - if not resolved_param and include_from_parent: + if not resolved_params and include_from_parent: raise ValueError( f"Resource {resource_name} has include_from_parent but is not " "dependent on another resource" @@ -267,7 +267,7 @@ def process( resource.add_map(step["map"]) return resource - if resolved_param is None: + if resolved_params is None: def paginate_resource( method: HTTPMethodBasic, @@ -318,9 +318,10 @@ def paginate_resource( resources[resource_name] = process(resources[resource_name], processing_steps) else: - predecessor = resources[resolved_param.resolve_config["resource"]] + first_param = resolved_params[0] + predecessor = resources[first_param.resolve_config["resource"]] - base_params = exclude_keys(request_params, {resolved_param.param_name}) + base_params = exclude_keys(request_params, {x.param_name for x in resolved_params}) def paginate_dependent_resource( items: List[Dict[str, Any]], @@ -331,7 +332,7 @@ def paginate_dependent_resource( data_selector: Optional[jsonpath.TJsonPath], hooks: Optional[Dict[str, Any]], client: RESTClient = client, - resolved_param: ResolvedParam = resolved_param, + resolved_params: List[ResolvedParam] = resolved_params, include_from_parent: List[str] = include_from_parent, incremental_object: Optional[Incremental[Any]] = incremental_object, incremental_param: Optional[IncrementalParam] = incremental_param, @@ -349,7 +350,7 @@ def paginate_dependent_resource( for item in items: formatted_path, parent_record = process_parent_data_item( - path, item, resolved_param, include_from_parent + path, item, resolved_params, include_from_parent ) for child_page in client.paginate( diff --git a/dlt/sources/rest_api/config_setup.py b/dlt/sources/rest_api/config_setup.py index 916715b214..8debaa59da 100644 --- a/dlt/sources/rest_api/config_setup.py +++ b/dlt/sources/rest_api/config_setup.py @@ -273,10 +273,10 @@ def build_resource_dependency_graph( resource_defaults: EndpointResourceBase, resource_list: List[Union[str, EndpointResource, DltResource]], ) -> Tuple[ - Any, Dict[str, Union[EndpointResource, DltResource]], Dict[str, Optional[ResolvedParam]] + Any, Dict[str, Union[EndpointResource, DltResource]], Dict[str, Optional[List[ResolvedParam]]] ]: dependency_graph = graphlib.TopologicalSorter() - resolved_param_map: Dict[str, ResolvedParam] = {} + resolved_param_map: Dict[str, Optional[List[ResolvedParam]]] = {} endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults) # create dependency graph @@ -288,20 +288,24 @@ def build_resource_dependency_graph( assert isinstance(endpoint_resource["endpoint"], dict) # connect transformers to resources via resolved params resolved_params = _find_resolved_params(endpoint_resource["endpoint"]) - if len(resolved_params) > 1: - raise ValueError( - f"Multiple resolved params for resource {resource_name}: {resolved_params}" - ) - elif len(resolved_params) == 1: - resolved_param = resolved_params[0] - predecessor = resolved_param.resolve_config["resource"] + + # set of resources in resolved params + named_resources = {rp.resolve_config["resource"] for rp in resolved_params} + + if len(named_resources) > 1: + raise ValueError(f"Multiple parent resources for {resource_name}: {resolved_params}") + elif len(named_resources) == 1: + # validate the first parameter (note the resource is the same for all params) + first_param = resolved_params[0] + predecessor = first_param.resolve_config["resource"] if predecessor not in endpoint_resource_map: raise ValueError( f"A transformer resource {resource_name} refers to non existing parent resource" - f" {predecessor} on {resolved_param}" + f" {predecessor} on {first_param}" ) + dependency_graph.add(resource_name, predecessor) - resolved_param_map[resource_name] = resolved_param + resolved_param_map[resource_name] = resolved_params else: dependency_graph.add(resource_name) resolved_param_map[resource_name] = None @@ -574,21 +578,28 @@ def remove_field(response: Response, *args, **kwargs) -> Response: def process_parent_data_item( path: str, item: Dict[str, Any], - resolved_param: ResolvedParam, + resolved_params: List[ResolvedParam], include_from_parent: List[str], ) -> Tuple[str, Dict[str, Any]]: - parent_resource_name = resolved_param.resolve_config["resource"] + parent_resource_name = resolved_params[0].resolve_config["resource"] - field_values = jsonpath.find_values(resolved_param.field_path, item) + param_values = {} - if not field_values: - field_path = resolved_param.resolve_config["field"] - raise ValueError( - f"Transformer expects a field '{field_path}' to be present in the incoming data from" - f" resource {parent_resource_name} in order to bind it to path param" - f" {resolved_param.param_name}. Available parent fields are {', '.join(item.keys())}" - ) - bound_path = path.format(**{resolved_param.param_name: field_values[0]}) + for resolved_param in resolved_params: + field_values = jsonpath.find_values(resolved_param.field_path, item) + + if not field_values: + field_path = resolved_param.resolve_config["field"] + raise ValueError( + f"Transformer expects a field '{field_path}' to be present in the incoming data" + f" from resource {parent_resource_name} in order to bind it to path param" + f" {resolved_param.param_name}. Available parent fields are" + f" {', '.join(item.keys())}" + ) + + param_values[resolved_param.param_name] = field_values[0] + + bound_path = path.format(**param_values) parent_record: Dict[str, Any] = {} if include_from_parent: diff --git a/tests/sources/rest_api/configurations/test_resolve_config.py b/tests/sources/rest_api/configurations/test_resolve_config.py index a0ca7ce890..d3d9308df1 100644 --- a/tests/sources/rest_api/configurations/test_resolve_config.py +++ b/tests/sources/rest_api/configurations/test_resolve_config.py @@ -88,32 +88,34 @@ def test_bind_path_param() -> None: def test_process_parent_data_item() -> None: - resolve_param = ResolvedParam( - "id", {"field": "obj_id", "resource": "issues", "type": "resolve"} - ) + resolve_params = [ + ResolvedParam("id", {"field": "obj_id", "resource": "issues", "type": "resolve"}) + ] bound_path, parent_record = process_parent_data_item( - "dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_param, None + "dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_params, None ) assert bound_path == "dlt-hub/dlt/issues/12345/comments" assert parent_record == {} bound_path, parent_record = process_parent_data_item( - "dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_param, ["obj_id"] + "dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345}, resolve_params, ["obj_id"] ) assert parent_record == {"_issues_obj_id": 12345} bound_path, parent_record = process_parent_data_item( "dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345, "obj_node": "node_1"}, - resolve_param, + resolve_params, ["obj_id", "obj_node"], ) assert parent_record == {"_issues_obj_id": 12345, "_issues_obj_node": "node_1"} # test nested data - resolve_param_nested = ResolvedParam( - "id", {"field": "some_results.obj_id", "resource": "issues", "type": "resolve"} - ) + resolve_param_nested = [ + ResolvedParam( + "id", {"field": "some_results.obj_id", "resource": "issues", "type": "resolve"} + ) + ] item = {"some_results": {"obj_id": 12345}} bound_path, parent_record = process_parent_data_item( "dlt-hub/dlt/issues/{id}/comments", item, resolve_param_nested, None @@ -123,7 +125,7 @@ def test_process_parent_data_item() -> None: # param path not found with pytest.raises(ValueError) as val_ex: bound_path, parent_record = process_parent_data_item( - "dlt-hub/dlt/issues/{id}/comments", {"_id": 12345}, resolve_param, None + "dlt-hub/dlt/issues/{id}/comments", {"_id": 12345}, resolve_params, None ) assert "Transformer expects a field 'obj_id'" in str(val_ex.value) @@ -132,11 +134,36 @@ def test_process_parent_data_item() -> None: bound_path, parent_record = process_parent_data_item( "dlt-hub/dlt/issues/{id}/comments", {"obj_id": 12345, "obj_node": "node_1"}, - resolve_param, + resolve_params, ["obj_id", "node"], ) assert "in order to include it in child records under _issues_node" in str(val_ex.value) + # Resolve multiple parameters from a single record + multi_resolve_params = [ + ResolvedParam("issue_id", {"field": "issue", "resource": "comments", "type": "resolve"}), + ResolvedParam("id", {"field": "id", "resource": "comments", "type": "resolve"}), + ] + + bound_path, parent_record = process_parent_data_item( + "dlt-hub/dlt/issues/{issue_id}/comments/{id}", + {"issue": 12345, "id": 56789}, + multi_resolve_params, + None, + ) + assert bound_path == "dlt-hub/dlt/issues/12345/comments/56789" + assert parent_record == {} + + # param path not found with multiple parameters + with pytest.raises(ValueError) as val_ex: + bound_path, parent_record = process_parent_data_item( + "dlt-hub/dlt/issues/{issue_id}/comments/{id}", + {"_issue": 12345, "id": 56789}, + multi_resolve_params, + None, + ) + assert "Transformer expects a field 'issue'" in str(val_ex.value) + def test_two_resources_can_depend_on_one_parent_resource() -> None: user_id = { @@ -173,7 +200,7 @@ def test_two_resources_can_depend_on_one_parent_resource() -> None: assert resources["user_details"]._pipe.parent.name == "users" -def test_dependent_resource_cannot_bind_multiple_parameters() -> None: +def test_dependent_resource_can_bind_multiple_parameters() -> None: config: RESTAPIConfig = { "client": { "base_url": "https://api.example.com", @@ -200,15 +227,9 @@ def test_dependent_resource_cannot_bind_multiple_parameters() -> None: }, ], } - with pytest.raises(ValueError) as e: - rest_api_resources(config) - error_part_1 = re.escape( - "Multiple resolved params for resource user_details: [ResolvedParam(param_name='user_id'" - ) - error_part_2 = re.escape("ResolvedParam(param_name='group_id'") - assert e.match(error_part_1) - assert e.match(error_part_2) + resources = rest_api_source(config).resources + assert resources["user_details"]._pipe.parent.name == "users" def test_one_resource_cannot_bind_two_parents() -> None: @@ -244,7 +265,7 @@ def test_one_resource_cannot_bind_two_parents() -> None: rest_api_resources(config) error_part_1 = re.escape( - "Multiple resolved params for resource user_details: [ResolvedParam(param_name='user_id'" + "Multiple parent resources for user_details: [ResolvedParam(param_name='user_id'" ) error_part_2 = re.escape("ResolvedParam(param_name='group_id'") assert e.match(error_part_1)