Skip to content

Commit

Permalink
Create Dataset Lineage (#99)
Browse files Browse the repository at this point in the history
* Create Dataset Lineage
  • Loading branch information
knighto82 authored Jan 15, 2025
1 parent f34ed72 commit 0249a9c
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;

import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
Expand Down Expand Up @@ -131,6 +133,33 @@ public void testUpdateDataset() {
Assertions.assertDoesNotThrow(dataset::update);
}

@Test
public void testUpdateDatasetLineage() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/common/datasets/SD0002/lineage"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("dataset/dataset-SD0002-lineage-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

Dataset dataset = getSdk().builders().dataset()
.identifier("SD0002")
.catalogIdentifier("common")
.build();


// When & Then
Assertions.assertDoesNotThrow(() -> dataset.createLineage(DatasetLineage.builder()
.source(new LinkedHashSet<>(Arrays.asList(
DatasetReference.builder().catalog("foo").dataset("d1").build(),
DatasetReference.builder().catalog("foo").dataset("d2").build(),
DatasetReference.builder().catalog("bar").dataset("d1").build(),
DatasetReference.builder().catalog("bar").dataset("d3").build()
)))
.build()));
}

@Test
public void testUpdateDatasetRetrievedFromListDatasets() {
// Given
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/github/jpmorganchase/fusion/Fusion.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ public String create(String apiPath, CatalogResource resource) {
return this.api.callAPIToPost(apiPath, resource);
}

/**
* Creates a new catalog resource by sending a POST request to the specified API path.
*
* @param apiPath the API endpoint path where the resource should be created.
* @param resource the {@link Object} to be created.
* @return the response from the API as a {@link String}.
*/
public String create(String apiPath, Object resource) {
return this.api.callAPIToPost(apiPath, resource);
}

/**
* Updates an existing catalog resource by sending a PUT request to the specified API path.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/github/jpmorganchase/fusion/api/APIManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ public interface APIManager extends APIDownloadOperations, APIUploadOperations {
*/
String callAPIToPost(String apiPath, CatalogResource catalogResource);

/**
* Sends a POST request to the specified API endpoint with the provided catalog resource.
*
* @param apiPath the API endpoint path to which the POST request will be sent
* @param resource the resource object to be serialized and sent as the request body
* @return the response body as a {@code String} if the request is successful
* @throws APICallException if the response status indicates an error or the request fails
*/
String callAPIToPost(String apiPath, Object resource);

/**
* Sends a PUT request to the specified API endpoint with the provided catalog resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ public String callAPIToPost(String apiPath, CatalogResource catalogResource) thr
return response.getBody();
}

/**
* Sends a POST request to the specified API endpoint with the provided catalog resource.
*
* <p>This method constructs the necessary authorization headers using a bearer token from
* the {@code tokenProvider}, serializes the given {@code catalogResource} into JSON,
* and sends a POST request to the specified {@code apiPath} using the {@code httpClient}.
* It then checks the HTTP response status for errors and returns the response body if successful.
*
* @param apiPath the API endpoint path to which the POST request will be sent
* @param resource the resource object to be serialized and sent as the request body
* @return the response body as a {@code String} if the request is successful
* @throws APICallException if the response status indicates an error or the request fails
*/
@Override
public String callAPIToPost(String apiPath, Object resource) throws APICallException {
Map<String, String> requestHeaders = new HashMap<>();
requestHeaders.put("Authorization", "Bearer " + tokenProvider.getSessionBearerToken());
requestHeaders.put("Content-Type", "application/json");

HttpResponse<String> response = httpClient.post(apiPath, requestHeaders, serializer.serialize(resource));
checkResponseStatus(response);
return response.getBody();
}

/**
* Sends a PUT request to the specified API endpoint with the provided catalog resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ protected String getApiPath() {
getFusion().getRootURL(), this.getCatalogIdentifier(), this.getIdentifier());
}

protected String getApiPathForLineage() {
return getApiPath() + "/lineage";
}

public void createLineage(DatasetLineage lineage) {
getFusion().create(getApiPathForLineage(), lineage);
}

@Override
public Set<String> getRegisteredAttributes() {
Set<String> exclusions = super.getRegisteredAttributes();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.jpmorganchase.fusion.model;

import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;

@Value
@EqualsAndHashCode
@ToString
@Builder
public class DatasetLineage {

Set<DatasetReference> source;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.jpmorganchase.fusion.model;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;

@Value
@EqualsAndHashCode
@ToString
@Builder
public class DatasetReference {

String catalog;
String dataset;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"source" : [ {
"catalog" : "foo",
"dataset" : "d1"
}, {
"catalog" : "foo",
"dataset" : "d2"
}, {
"catalog" : "bar",
"dataset" : "d1"
}, {
"catalog" : "bar",
"dataset" : "d3"
} ]
}

0 comments on commit 0249a9c

Please sign in to comment.